Skip to content

Commit

Permalink
Replace file reading/writing with interprocess queues for tests (#912)
Browse files Browse the repository at this point in the history
* Replace files with inter-process queues for tests

Instead of reading and writing files to communicate between
the various processes during testing, use multiprocessing
managed queues to send chess objects. This is much simpler
than serializing and deserializing text. There is no chance for
any test to overwrite communication from a different test.

* Update function/method docs

* Use non-conditional os.makedirs() function
  • Loading branch information
MarkZH authored Feb 16, 2024
1 parent 428b9ab commit 4c133b6
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 386 deletions.
2 changes: 0 additions & 2 deletions test_bot/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@ def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
"""Remove files created when testing lichess-bot."""
if os.path.exists("TEMP") and not os.getenv("GITHUB_ACTIONS"):
shutil.rmtree("TEMP")
if os.path.exists("logs"):
shutil.rmtree("logs")
103 changes: 56 additions & 47 deletions test_bot/lichess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import json
import logging
import traceback
from lib.timer import seconds, to_msec
import datetime
from queue import Queue
from typing import Union, Any, Optional, Generator
from lib.timer import to_msec
JSON_REPLY_TYPE = dict[str, Any]
REQUESTS_PAYLOAD_TYPE = dict[str, Any]

Expand All @@ -29,9 +31,18 @@ def is_final(error: Any) -> bool:
class GameStream:
"""Imitate lichess.org's GameStream. Used in tests."""

def __init__(self) -> None:
"""Initialize `self.moves_sent` to an empty string. It stores the moves that we have already sent."""
self.moves_sent = ""
def __init__(self,
board_queue: Queue[chess.Board],
clock_queue: Queue[tuple[datetime.timedelta, datetime.timedelta, datetime.timedelta]]) -> None:
"""
Capture the interprocess queues that will feed the gameStream with game information.
:param board_queue: Updated board positions from the lichess_org_simulator() function.
:param clock_queue: Updated game clock timings (white time, black time, and increment) from the
lichess_org_simulator() function.
"""
self.board_queue = board_queue
self.clock_queue = clock_queue

def iter_lines(self) -> Generator[bytes, None, None]:
"""Send the game events to lichess-bot."""
Expand Down Expand Up @@ -64,40 +75,27 @@ def iter_lines(self) -> Generator[bytes, None, None]:
"winc": 100,
"binc": 100,
"status": "started"}}).encode("utf-8")
time.sleep(1)
while True:
time.sleep(0.001)
with open("./logs/events.txt") as events:
event = events.read()
while True:
try:
with open("./logs/states.txt") as states:
state = states.read().split("\n")
moves = state[0]
board = chess.Board()
for move in moves.split():
board.push_uci(move)
wtime, btime = [seconds(float(n)) for n in state[1].split(",")]
if len(moves) <= len(self.moves_sent) and not event:
time.sleep(0.001)
continue
self.moves_sent = moves
break
except (IndexError, ValueError):
pass
time.sleep(0.1)
board = self.board_queue.get()
self.board_queue.task_done()

wtime, btime, increment = self.clock_queue.get()
self.clock_queue.task_done()

new_game_state = {"type": "gameState",
"moves": moves,
"moves": " ".join(move.uci() for move in board.move_stack),
"wtime": int(to_msec(wtime)),
"btime": int(to_msec(btime)),
"winc": 100,
"binc": 100}
if event == "end":
"winc": int(to_msec(increment)),
"binc": int(to_msec(increment))}

if board.is_game_over():
new_game_state["status"] = "outoftime"
new_game_state["winner"] = "black"
yield json.dumps(new_game_state).encode("utf-8")
break
if moves:

if board.move_stack:
new_game_state["status"] = "started"
yield json.dumps(new_game_state).encode("utf-8")

Expand All @@ -106,7 +104,11 @@ class EventStream:
"""Imitate lichess.org's EventStream. Used in tests."""

def __init__(self, sent_game: bool = False) -> None:
""":param sent_game: If we have already sent the `gameStart` event, so we don't send it again."""
"""
Start the event stream for the lichess_bot_main() loop.
:param sent_game: If we have already sent the `gameStart` event, so we don't send it again.
"""
self.sent_game = sent_game

def iter_lines(self) -> Generator[bytes, None, None]:
Expand All @@ -127,26 +129,31 @@ def iter_lines(self) -> Generator[bytes, None, None]:
class Lichess:
"""Imitate communication with lichess.org."""

def __init__(self, token: str, url: str, version: str, logging_level: int, max_retries: int) -> None:
"""Has the same parameters as `lichess.Lichess` to be able to be used in its placed without any modification."""
self.baseUrl = url
self.game_accepted = False
self.moves: list[chess.engine.PlayResult] = []
def __init__(self,
move_queue: Queue[Optional[chess.Move]],
board_queue: Queue[chess.Board],
clock_queue: Queue[tuple[datetime.timedelta, datetime.timedelta, datetime.timedelta]]) -> None:
"""
Capture the interprocess queues to distribute them to the eventStream and gameStream instances.
:param move_queue: An interprocess queue to send moves chosen by the bot under test to the mock lichess function.
:param board_queue: An interprocess queue to send board positions to the mock game stream.
:param clock_queue: An interprocess queue to send game clock information to the mock game stream.
"""
self.baseUrl = "testing"
self.move_queue = move_queue
self.board_queue = board_queue
self.clock_queue = clock_queue
self.sent_game = False
self.started_game_stream = False

def upgrade_to_bot_account(self) -> JSON_REPLY_TYPE:
"""Isn't used in tests."""
return {}

def make_move(self, game_id: str, move: chess.engine.PlayResult) -> JSON_REPLY_TYPE:
"""Write a move to `./logs/states.txt`, to be read by the opponent."""
self.moves.append(move)
uci_move = move.move.uci() if move.move else "error"
with open("./logs/states.txt") as file:
contents = file.read().split("\n")
contents[0] += f" {uci_move}"
with open("./logs/states.txt", "w") as file:
file.write("\n".join(contents))
"""Send a move to the opponent engine thread."""
self.move_queue.put(move.move)
return {}

def chat(self, game_id: str, room: str, text: str) -> JSON_REPLY_TYPE:
Expand All @@ -165,11 +172,13 @@ def get_event_stream(self) -> EventStream:

def get_game_stream(self, game_id: str) -> GameStream:
"""Send the `GameStream`."""
return GameStream()
if self.started_game_stream:
self.move_queue.put(None)
self.started_game_stream = True
return GameStream(self.board_queue, self.clock_queue)

def accept_challenge(self, challenge_id: str) -> JSON_REPLY_TYPE:
"""Set `self.game_accepted` to true."""
self.game_accepted = True
"""Isn't used in tests."""
return {}

def decline_challenge(self, challenge_id: str, reason: str = "generic") -> JSON_REPLY_TYPE:
Expand Down
Loading

0 comments on commit 4c133b6

Please sign in to comment.