diff --git a/lib/conversation.py b/lib/conversation.py index 79450f8fa..fb4629cc4 100644 --- a/lib/conversation.py +++ b/lib/conversation.py @@ -1,12 +1,15 @@ """Allows lichess-bot to send messages to the chat.""" from __future__ import annotations import logging +import test_bot.lichess from lib import model from lib.engine_wrapper import EngineWrapper -from lib.lichess import Lichess +from lib import lichess from collections.abc import Sequence from lib.timer import seconds +from typing import Union MULTIPROCESSING_LIST_TYPE = Sequence[model.Challenge] +LICHESS_TYPE = Union[lichess.Lichess, test_bot.lichess.Lichess] logger = logging.getLogger(__name__) @@ -14,7 +17,7 @@ class Conversation: """Enables the bot to communicate with its opponent and the spectators.""" - def __init__(self, game: model.Game, engine: EngineWrapper, li: Lichess, version: str, + def __init__(self, game: model.Game, engine: EngineWrapper, li: LICHESS_TYPE, version: str, challenge_queue: MULTIPROCESSING_LIST_TYPE) -> None: """ Communication between lichess-bot and the game chats. diff --git a/lib/engine_wrapper.py b/lib/engine_wrapper.py index 649f77b6c..c0a202f84 100644 --- a/lib/engine_wrapper.py +++ b/lib/engine_wrapper.py @@ -12,6 +12,7 @@ import time import random import math +import test_bot.lichess from collections import Counter from collections.abc import Callable from lib import config, model, lichess @@ -25,6 +26,7 @@ LICHESS_EGTB_MOVE = dict[str, Any] CHESSDB_EGTB_MOVE = dict[str, Any] MOVE = Union[chess.engine.PlayResult, list[chess.Move]] +LICHESS_TYPE = Union[lichess.Lichess, test_bot.lichess.Lichess] logger = logging.getLogger(__name__) @@ -110,7 +112,7 @@ def __exit__(self, exc_type: Optional[Type[BaseException]], def play_move(self, board: chess.Board, game: model.Game, - li: lichess.Lichess, + li: LICHESS_TYPE, setup_timer: Timer, move_overhead: datetime.timedelta, can_ponder: bool, @@ -585,6 +587,9 @@ def method(*args: Any, **kwargs: Any) -> Any: return method +test_suffix = "-for-lichess-bot-testing-only" + + def getHomemadeEngine(name: str) -> type[MinimalEngine]: """ Get the homemade engine with name `name`. e.g. If `name` is `RandomMove` then we will return `strategies.RandomMove`. @@ -593,7 +598,12 @@ def getHomemadeEngine(name: str) -> type[MinimalEngine]: :return: The engine with this name. """ from lib import strategies - engine: type[MinimalEngine] = getattr(strategies, name) + from test_bot import strategies as test_strategies + engine: type[MinimalEngine] + if name.endswith(test_suffix): # Test only. + engine = getattr(test_strategies, name.removesuffix(test_suffix)) + else: + engine = getattr(strategies, name) return engine @@ -728,7 +738,7 @@ def get_book_move(board: chess.Board, game: model.Game, return no_book_move -def get_online_move(li: lichess.Lichess, board: chess.Board, game: model.Game, online_moves_cfg: config.Configuration, +def get_online_move(li: LICHESS_TYPE, board: chess.Board, game: model.Game, online_moves_cfg: config.Configuration, draw_or_resign_cfg: config.Configuration) -> Union[chess.engine.PlayResult, list[chess.Move]]: """ Get a move from an online source. @@ -780,7 +790,7 @@ def get_online_move(li: lichess.Lichess, board: chess.Board, game: model.Game, o return chess.engine.PlayResult(None, None) -def get_chessdb_move(li: lichess.Lichess, board: chess.Board, game: model.Game, +def get_chessdb_move(li: LICHESS_TYPE, board: chess.Board, game: model.Game, chessdb_cfg: config.Configuration) -> tuple[Optional[str], chess.engine.InfoDict]: """Get a move from chessdb.cn's opening book.""" wb = "w" if board.turn == chess.WHITE else "b" @@ -822,7 +832,7 @@ def get_chessdb_move(li: lichess.Lichess, board: chess.Board, game: model.Game, return move, comment -def get_lichess_cloud_move(li: lichess.Lichess, board: chess.Board, game: model.Game, +def get_lichess_cloud_move(li: LICHESS_TYPE, board: chess.Board, game: model.Game, lichess_cloud_cfg: config.Configuration) -> tuple[Optional[str], chess.engine.InfoDict]: """Get a move from the lichess's cloud analysis.""" wb = "w" if board.turn == chess.WHITE else "b" @@ -876,7 +886,7 @@ def get_lichess_cloud_move(li: lichess.Lichess, board: chess.Board, game: model. return move, comment -def get_opening_explorer_move(li: lichess.Lichess, board: chess.Board, game: model.Game, +def get_opening_explorer_move(li: LICHESS_TYPE, board: chess.Board, game: model.Game, opening_explorer_cfg: config.Configuration ) -> tuple[Optional[str], chess.engine.InfoDict]: """Get a move from lichess's opening explorer.""" @@ -926,7 +936,7 @@ def get_opening_explorer_move(li: lichess.Lichess, board: chess.Board, game: mod return move, comment -def get_online_egtb_move(li: lichess.Lichess, board: chess.Board, game: model.Game, online_egtb_cfg: config.Configuration +def get_online_egtb_move(li: LICHESS_TYPE, board: chess.Board, game: model.Game, online_egtb_cfg: config.Configuration ) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]: """ Get a move from an online egtb (either by lichess or chessdb). @@ -992,7 +1002,7 @@ def get_egtb_move(board: chess.Board, game: model.Game, lichess_bot_tbs: config. return chess.engine.PlayResult(None, None) -def get_lichess_egtb_move(li: lichess.Lichess, game: model.Game, board: chess.Board, quality: str, +def get_lichess_egtb_move(li: LICHESS_TYPE, game: model.Game, board: chess.Board, quality: str, variant: str) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]: """ Get a move from lichess's egtb. @@ -1045,7 +1055,7 @@ def good_enough(possible_move: LICHESS_EGTB_MOVE) -> bool: return None, -3, {} -def get_chessdb_egtb_move(li: lichess.Lichess, game: model.Game, board: chess.Board, +def get_chessdb_egtb_move(li: LICHESS_TYPE, game: model.Game, board: chess.Board, quality: str) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]: """ Get a move from chessdb's egtb. diff --git a/lib/matchmaking.py b/lib/matchmaking.py index 86e0f0901..82d827245 100644 --- a/lib/matchmaking.py +++ b/lib/matchmaking.py @@ -1,18 +1,20 @@ """Challenge other bots.""" import random import logging +import datetime +import test_bot.lichess from lib import model from lib.timer import Timer, seconds, minutes, days from collections import defaultdict from collections.abc import Sequence from lib import lichess -import datetime from lib.config import Configuration, FilterType -from typing import Any, Optional +from typing import Any, Optional, Union USER_PROFILE_TYPE = dict[str, Any] EVENT_TYPE = dict[str, Any] MULTIPROCESSING_LIST_TYPE = Sequence[model.Challenge] DAILY_TIMERS_TYPE = list[Timer] +LICHESS_TYPE = Union[lichess.Lichess, test_bot.lichess.Lichess] logger = logging.getLogger(__name__) @@ -43,7 +45,7 @@ def write_daily_challenges(daily_challenges: DAILY_TIMERS_TYPE) -> None: class Matchmaking: """Challenge other bots.""" - def __init__(self, li: lichess.Lichess, config: Configuration, user_profile: USER_PROFILE_TYPE) -> None: + def __init__(self, li: LICHESS_TYPE, config: Configuration, user_profile: USER_PROFILE_TYPE) -> None: """Initialize values needed for matchmaking.""" self.li = li self.variants = list(filter(lambda variant: variant != "fromPosition", config.challenge.variants)) diff --git a/lichess-bot.py b/lichess-bot.py index ba1e042f0..1cafe7790 100644 --- a/lichess-bot.py +++ b/lichess-bot.py @@ -19,6 +19,7 @@ import sys import yaml import traceback +import test_bot.lichess from lib.config import load_config, Configuration from lib.conversation import Conversation, ChatLine from lib.timer import Timer, seconds, msec, hours, to_seconds @@ -40,6 +41,7 @@ LOGGING_QUEUE_TYPE = Queue[logging.LogRecord] MULTIPROCESSING_LIST_TYPE = MutableSequence[model.Challenge] POOL_TYPE = Pool +LICHESS_TYPE = Union[lichess.Lichess, test_bot.lichess.Lichess] logger = logging.getLogger(__name__) @@ -68,7 +70,7 @@ def signal_handler(signal: int, frame: Any) -> None: signal.signal(signal.SIGINT, signal_handler) -def upgrade_account(li: lichess.Lichess) -> bool: +def upgrade_account(li: LICHESS_TYPE) -> bool: """Upgrade the account to a BOT account.""" if li.upgrade_to_bot_account() is None: return False @@ -77,7 +79,7 @@ def upgrade_account(li: lichess.Lichess) -> bool: return True -def watch_control_stream(control_queue: CONTROL_QUEUE_TYPE, li: lichess.Lichess) -> None: +def watch_control_stream(control_queue: CONTROL_QUEUE_TYPE, li: LICHESS_TYPE) -> None: """Put the events in a queue.""" error = None while not terminated: @@ -189,7 +191,7 @@ def thread_logging_configurer(queue: Union[CONTROL_QUEUE_TYPE, LOGGING_QUEUE_TYP root.setLevel(logging.DEBUG) -def start(li: lichess.Lichess, user_profile: USER_PROFILE_TYPE, config: Configuration, logging_level: int, +def start(li: LICHESS_TYPE, user_profile: USER_PROFILE_TYPE, config: Configuration, logging_level: int, log_filename: Optional[str], auto_log_filename: Optional[str], one_game: bool = False) -> None: """ Start lichess-bot. @@ -253,7 +255,7 @@ def log_proc_count(change: str, active_games: set[str]) -> None: logger.info(f"{symbol} Process {change}. Count: {len(active_games)}. IDs: {active_games or None}") -def lichess_bot_main(li: lichess.Lichess, +def lichess_bot_main(li: LICHESS_TYPE, user_profile: USER_PROFILE_TYPE, config: Configuration, challenge_queue: MULTIPROCESSING_LIST_TYPE, @@ -407,7 +409,7 @@ def start_low_time_games(low_time_games: list[EVENT_GETATTR_GAME_TYPE], active_g start_game_thread(active_games, game_id, play_game_args, pool) -def accept_challenges(li: lichess.Lichess, challenge_queue: MULTIPROCESSING_LIST_TYPE, active_games: set[str], +def accept_challenges(li: LICHESS_TYPE, challenge_queue: MULTIPROCESSING_LIST_TYPE, active_games: set[str], max_games: int) -> None: """Accept a challenge.""" while len(active_games) < max_games and challenge_queue: @@ -425,7 +427,7 @@ def accept_challenges(li: lichess.Lichess, challenge_queue: MULTIPROCESSING_LIST logger.info(f"Skip missing {chlng}") -def check_online_status(li: lichess.Lichess, user_profile: USER_PROFILE_TYPE, last_check_online_time: Timer) -> None: +def check_online_status(li: LICHESS_TYPE, user_profile: USER_PROFILE_TYPE, last_check_online_time: Timer) -> None: """Check if lichess.org thinks the bot is online or not. If it isn't, we restart it.""" global restart @@ -452,7 +454,7 @@ def sort_challenges(challenge_queue: MULTIPROCESSING_LIST_TYPE, challenge_config challenge_queue[:] = list_c -def game_is_active(li: lichess.Lichess, game_id: str) -> bool: +def game_is_active(li: LICHESS_TYPE, game_id: str) -> bool: """Determine if a game is still being played.""" return game_id in (ongoing_game["gameId"] for ongoing_game in li.get_ongoing_games()) @@ -517,7 +519,7 @@ def enough_time_to_queue(event: EVENT_TYPE, config: Configuration) -> bool: return not game["isMyTurn"] or game.get("secondsLeft", math.inf) > minimum_time -def handle_challenge(event: EVENT_TYPE, li: lichess.Lichess, challenge_queue: MULTIPROCESSING_LIST_TYPE, +def handle_challenge(event: EVENT_TYPE, li: LICHESS_TYPE, challenge_queue: MULTIPROCESSING_LIST_TYPE, challenge_config: Configuration, user_profile: USER_PROFILE_TYPE, recent_bot_challenges: defaultdict[str, list[Timer]]) -> None: """Handle incoming challenges. It either accepts, declines, or queues them to accept later.""" @@ -538,7 +540,7 @@ def handle_challenge(event: EVENT_TYPE, li: lichess.Lichess, challenge_queue: MU @backoff.on_exception(backoff.expo, BaseException, max_time=600, giveup=lichess.is_final, # type: ignore[arg-type] on_backoff=lichess.backoff_handler) -def play_game(li: lichess.Lichess, +def play_game(li: LICHESS_TYPE, game_id: str, control_queue: CONTROL_QUEUE_TYPE, user_profile: USER_PROFILE_TYPE, @@ -719,7 +721,7 @@ def is_game_over(game: model.Game) -> bool: return status != "started" -def should_exit_game(board: chess.Board, game: model.Game, prior_game: Optional[model.Game], li: lichess.Lichess, +def should_exit_game(board: chess.Board, game: model.Game, prior_game: Optional[model.Game], li: LICHESS_TYPE, is_correspondence: bool) -> bool: """Whether we should exit a game.""" if (is_correspondence @@ -807,7 +809,7 @@ def tell_user_game_result(game: model.Game, board: chess.Board) -> None: logger.info(f"Game ended by {termination}") -def try_get_pgn_game_record(li: lichess.Lichess, config: Configuration, game: model.Game, board: chess.Board, +def try_get_pgn_game_record(li: LICHESS_TYPE, config: Configuration, game: model.Game, board: chess.Board, engine: engine_wrapper.EngineWrapper) -> str: """ Call `print_pgn_game_record` to write the game to a PGN file and handle errors raised by it. @@ -825,7 +827,7 @@ def try_get_pgn_game_record(li: lichess.Lichess, config: Configuration, game: mo return "" -def pgn_game_record(li: lichess.Lichess, config: Configuration, game: model.Game, board: chess.Board, +def pgn_game_record(li: LICHESS_TYPE, config: Configuration, game: model.Game, board: chess.Board, engine: engine_wrapper.EngineWrapper) -> str: """ Return the text of the game's PGN. diff --git a/test_bot/conftest.py b/test_bot/conftest.py index 5317cdc65..5563bb29a 100644 --- a/test_bot/conftest.py +++ b/test_bot/conftest.py @@ -6,8 +6,6 @@ def pytest_sessionfinish(session: Any, exitstatus: Any) -> None: """Remove files created when testing lichess-bot.""" - shutil.copyfile("lib/correct_lichess.py", "lib/lichess.py") - os.remove("lib/correct_lichess.py") if os.path.exists("TEMP") and not os.getenv("GITHUB_ACTIONS"): shutil.rmtree("TEMP") if os.path.exists("logs"): diff --git a/test_bot/lichess.py b/test_bot/lichess.py index de6dca52b..e2555b8b1 100644 --- a/test_bot/lichess.py +++ b/test_bot/lichess.py @@ -7,6 +7,8 @@ import traceback from lib.timer import seconds, to_msec from typing import Union, Any, Optional, Generator +JSON_REPLY_TYPE = dict[str, Any] +REQUESTS_PAYLOAD_TYPE = dict[str, Any] logger = logging.getLogger(__name__) @@ -125,18 +127,18 @@ def iter_lines(self) -> Generator[bytes, None, None]: class Lichess: """Imitate communication with lichess.org.""" - def __init__(self, token: str, url: str, version: str) -> None: + def __init__(self, token: str, url: str, version: str, logging_level: int, max_retries: int) -> None: """Has the same parameters as `lichess.Lichess` to be able to be used in its placed without any modification.""" self.baseUrl = url self.game_accepted = False self.moves: list[chess.engine.PlayResult] = [] self.sent_game = False - def upgrade_to_bot_account(self) -> None: + def upgrade_to_bot_account(self) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} - def make_move(self, game_id: str, move: chess.engine.PlayResult) -> None: + def make_move(self, game_id: str, move: chess.engine.PlayResult) -> JSON_REPLY_TYPE: """Write a move to `./logs/states.txt`, to be read by the opponent.""" self.moves.append(move) uci_move = move.move.uci() if move.move else "error" @@ -145,14 +147,15 @@ def make_move(self, game_id: str, move: chess.engine.PlayResult) -> None: contents[0] += f" {uci_move}" with open("./logs/states.txt", "w") as file: file.write("\n".join(contents)) + return {} - def chat(self, game_id: str, room: str, text: str) -> None: + def chat(self, game_id: str, room: str, text: str) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} - def abort(self, game_id: str) -> None: + def abort(self, game_id: str) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} def get_event_stream(self) -> EventStream: """Send the `EventStream`.""" @@ -164,13 +167,14 @@ def get_game_stream(self, game_id: str) -> GameStream: """Send the `GameStream`.""" return GameStream() - def accept_challenge(self, challenge_id: str) -> None: + def accept_challenge(self, challenge_id: str) -> JSON_REPLY_TYPE: """Set `self.game_accepted` to true.""" self.game_accepted = True + return {} - def decline_challenge(self, challenge_id: str, reason: str = "generic") -> None: + def decline_challenge(self, challenge_id: str, reason: str = "generic") -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} def get_profile(self) -> dict[str, Union[str, bool, dict[str, str]]]: """Return a simple profile for the bot that lichess-bot uses when testing.""" @@ -185,7 +189,7 @@ def get_profile(self) -> dict[str, Union[str, bool, dict[str, str]]]: "followsYou": False, "perfs": {}} - def get_ongoing_games(self) -> list[str]: + def get_ongoing_games(self) -> list[dict[str, Any]]: """Return that the bot isn't playing a game.""" return [] @@ -211,18 +215,22 @@ def get_online_bots(self) -> list[dict[str, Union[str, bool]]]: """Return that the only bot online is us.""" return [{"username": "b", "online": True}] - def challenge(self, username: str, params: dict[str, str]) -> None: + def challenge(self, username: str, payload: REQUESTS_PAYLOAD_TYPE) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} - def cancel(self, challenge_id: str) -> None: + def cancel(self, challenge_id: str) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} - def online_book_get(self, path: str, params: Optional[dict[str, str]] = None) -> None: + def online_book_get(self, path: str, params: Optional[dict[str, Any]] = None, stream: bool = False) -> JSON_REPLY_TYPE: """Isn't used in tests.""" - return + return {} def is_online(self, user_id: str) -> bool: """Return that a bot is online.""" return True + + def get_public_data(self, user_name: str) -> JSON_REPLY_TYPE: + """Isn't used in tests.""" + return {} diff --git a/test_bot/strategies.py b/test_bot/strategies.py new file mode 100644 index 000000000..34a269d47 --- /dev/null +++ b/test_bot/strategies.py @@ -0,0 +1,28 @@ +"""Homemade engine using Stockfish (used in testing).""" +from lib.strategies import ExampleEngine +import chess +import chess.engine +import sys +from lib.config import Configuration +from typing import Any, Optional, Union +OPTIONS_TYPE = dict[str, Any] +COMMANDS_TYPE = list[str] +MOVE = Union[chess.engine.PlayResult, list[chess.Move]] + +platform = sys.platform +file_extension = ".exe" if platform == "win32" else "" + + +class Stockfish(ExampleEngine): + """A homemade engine that uses Stockfish.""" + + def __init__(self, commands: COMMANDS_TYPE, options: OPTIONS_TYPE, stderr: Optional[int], + draw_or_resign: Configuration, **popen_args: str): + """Start Stockfish.""" + super().__init__(commands, options, stderr, draw_or_resign, **popen_args) + self.engine = chess.engine.SimpleEngine.popen_uci(f"./TEMP/sf{file_extension}") + + def search(self, board: chess.Board, time_limit: chess.engine.Limit, ponder: bool, draw_offered: bool, + root_moves: MOVE) -> chess.engine.PlayResult: + """Get a move using Stockfish.""" + return self.engine.play(board, time_limit) diff --git a/test_bot/test_bot.py b/test_bot/test_bot.py index 4e95f54f2..9b2bc25b3 100644 --- a/test_bot/test_bot.py +++ b/test_bot/test_bot.py @@ -12,14 +12,14 @@ import stat import shutil import importlib -from lib import config import tarfile +import test_bot.lichess +from lib import config from lib.timer import Timer, to_seconds, seconds from typing import Any -if __name__ == "__main__": +from lib.engine_wrapper import test_suffix +if "pytest" not in sys.modules: sys.exit(f"The script {os.path.basename(__file__)} should only be run by pytest.") -shutil.copyfile("lib/lichess.py", "lib/correct_lichess.py") -shutil.copyfile("test_bot/lichess.py", "lib/lichess.py") lichess_bot = importlib.import_module("lichess-bot") platform = sys.platform @@ -185,7 +185,7 @@ def run_bot(raw_config: dict[str, Any], logging_level: int, opponent_path: str = config.insert_default_values(raw_config) CONFIG = config.Configuration(raw_config) lichess_bot.logger.info(lichess_bot.intro()) - li = lichess_bot.lichess.Lichess(CONFIG.token, CONFIG.url, lichess_bot.__version__) + li = test_bot.lichess.Lichess(CONFIG.token, CONFIG.url, lichess_bot.__version__, logging_level, 1) user_profile = li.get_profile() username = user_profile["username"] @@ -287,34 +287,17 @@ def test_homemade() -> None: if platform != "linux" and platform != "win32": assert True return - strategies_py = "lib/strategies.py" - with open(strategies_py) as file: - original_strategies = file.read() - - with open(strategies_py, "a") as file: - file.write(f""" -class Stockfish(ExampleEngine): - def __init__(self, commands, options, stderr, draw_or_resign, **popen_args): - super().__init__(commands, options, stderr, draw_or_resign, **popen_args) - import chess - self.engine = chess.engine.SimpleEngine.popen_uci('{stockfish_path}') - - def search(self, board, time_limit, *args): - return self.engine.play(board, time_limit) -""") if os.path.exists("logs"): shutil.rmtree("logs") os.mkdir("logs") with open("./config.yml.default") as file: CONFIG = yaml.safe_load(file) CONFIG["token"] = "" - CONFIG["engine"]["name"] = "Stockfish" + CONFIG["engine"]["name"] = f"Stockfish{test_suffix}" CONFIG["engine"]["protocol"] = "homemade" CONFIG["pgn_directory"] = "TEMP/homemade_game_record" win = run_bot(CONFIG, logging_level) shutil.rmtree("logs") - with open(strategies_py, "w") as file: - file.write(original_strategies) lichess_bot.logger.info("Finished Testing Homemade") assert win == "1" assert os.path.isfile(os.path.join(CONFIG["pgn_directory"],