Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace file reading/writing with interprocess queues for tests #912

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions test_bot/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@ def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
"""Remove files created when testing lichess-bot."""
if os.path.exists("TEMP") and not os.getenv("GITHUB_ACTIONS"):
shutil.rmtree("TEMP")
if os.path.exists("logs"):
shutil.rmtree("logs")
103 changes: 56 additions & 47 deletions test_bot/lichess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import json
import logging
import traceback
from lib.timer import seconds, to_msec
import datetime
from queue import Queue
from typing import Union, Any, Optional, Generator
from lib.timer import to_msec
JSON_REPLY_TYPE = dict[str, Any]
REQUESTS_PAYLOAD_TYPE = dict[str, Any]

Expand All @@ -29,9 +31,18 @@ def is_final(error: Any) -> bool:
class GameStream:
"""Imitate lichess.org's GameStream. Used in tests."""

def __init__(self) -> None:
"""Initialize `self.moves_sent` to an empty string. It stores the moves that we have already sent."""
self.moves_sent = ""
def __init__(self,
board_queue: Queue[chess.Board],
clock_queue: Queue[tuple[datetime.timedelta, datetime.timedelta, datetime.timedelta]]) -> None:
"""
Capture the interprocess queues that will feed the gameStream with game information.

:param board_queue: Updated board positions from the lichess_org_simulator() function.
:param clock_queue: Updated game clock timings (white time, black time, and increment) from the
lichess_org_simulator() function.
"""
self.board_queue = board_queue
self.clock_queue = clock_queue

def iter_lines(self) -> Generator[bytes, None, None]:
"""Send the game events to lichess-bot."""
Expand Down Expand Up @@ -64,40 +75,27 @@ def iter_lines(self) -> Generator[bytes, None, None]:
"winc": 100,
"binc": 100,
"status": "started"}}).encode("utf-8")
time.sleep(1)
while True:
time.sleep(0.001)
with open("./logs/events.txt") as events:
event = events.read()
while True:
try:
with open("./logs/states.txt") as states:
state = states.read().split("\n")
moves = state[0]
board = chess.Board()
for move in moves.split():
board.push_uci(move)
wtime, btime = [seconds(float(n)) for n in state[1].split(",")]
if len(moves) <= len(self.moves_sent) and not event:
time.sleep(0.001)
continue
self.moves_sent = moves
break
except (IndexError, ValueError):
pass
time.sleep(0.1)
board = self.board_queue.get()
self.board_queue.task_done()

wtime, btime, increment = self.clock_queue.get()
self.clock_queue.task_done()

new_game_state = {"type": "gameState",
"moves": moves,
"moves": " ".join(move.uci() for move in board.move_stack),
"wtime": int(to_msec(wtime)),
"btime": int(to_msec(btime)),
"winc": 100,
"binc": 100}
if event == "end":
"winc": int(to_msec(increment)),
"binc": int(to_msec(increment))}

if board.is_game_over():
new_game_state["status"] = "outoftime"
new_game_state["winner"] = "black"
yield json.dumps(new_game_state).encode("utf-8")
break
if moves:

if board.move_stack:
new_game_state["status"] = "started"
yield json.dumps(new_game_state).encode("utf-8")

Expand All @@ -106,7 +104,11 @@ class EventStream:
"""Imitate lichess.org's EventStream. Used in tests."""

def __init__(self, sent_game: bool = False) -> None:
""":param sent_game: If we have already sent the `gameStart` event, so we don't send it again."""
"""
Start the event stream for the lichess_bot_main() loop.

:param sent_game: If we have already sent the `gameStart` event, so we don't send it again.
"""
self.sent_game = sent_game

def iter_lines(self) -> Generator[bytes, None, None]:
Expand All @@ -127,26 +129,31 @@ def iter_lines(self) -> Generator[bytes, None, None]:
class Lichess:
"""Imitate communication with lichess.org."""

def __init__(self, token: str, url: str, version: str, logging_level: int, max_retries: int) -> None:
"""Has the same parameters as `lichess.Lichess` to be able to be used in its placed without any modification."""
self.baseUrl = url
self.game_accepted = False
self.moves: list[chess.engine.PlayResult] = []
def __init__(self,
move_queue: Queue[Optional[chess.Move]],
board_queue: Queue[chess.Board],
clock_queue: Queue[tuple[datetime.timedelta, datetime.timedelta, datetime.timedelta]]) -> None:
"""
Capture the interprocess queues to distribute them to the eventStream and gameStream instances.

:param move_queue: An interprocess queue to send moves chosen by the bot under test to the mock lichess function.
:param board_queue: An interprocess queue to send board positions to the mock game stream.
:param clock_queue: An interprocess queue to send game clock information to the mock game stream.
"""
self.baseUrl = "testing"
self.move_queue = move_queue
self.board_queue = board_queue
self.clock_queue = clock_queue
self.sent_game = False
self.started_game_stream = False

def upgrade_to_bot_account(self) -> JSON_REPLY_TYPE:
"""Isn't used in tests."""
return {}

def make_move(self, game_id: str, move: chess.engine.PlayResult) -> JSON_REPLY_TYPE:
"""Write a move to `./logs/states.txt`, to be read by the opponent."""
self.moves.append(move)
uci_move = move.move.uci() if move.move else "error"
with open("./logs/states.txt") as file:
contents = file.read().split("\n")
contents[0] += f" {uci_move}"
with open("./logs/states.txt", "w") as file:
file.write("\n".join(contents))
"""Send a move to the opponent engine thread."""
self.move_queue.put(move.move)
return {}

def chat(self, game_id: str, room: str, text: str) -> JSON_REPLY_TYPE:
Expand All @@ -165,11 +172,13 @@ def get_event_stream(self) -> EventStream:

def get_game_stream(self, game_id: str) -> GameStream:
"""Send the `GameStream`."""
return GameStream()
if self.started_game_stream:
self.move_queue.put(None)
self.started_game_stream = True
return GameStream(self.board_queue, self.clock_queue)

def accept_challenge(self, challenge_id: str) -> JSON_REPLY_TYPE:
"""Set `self.game_accepted` to true."""
self.game_accepted = True
"""Isn't used in tests."""
return {}

def decline_challenge(self, challenge_id: str, reason: str = "generic") -> JSON_REPLY_TYPE:
Expand Down
Loading
Loading