From 10d3df3277902d6a8817ba52c79168586548ce0f Mon Sep 17 00:00:00 2001 From: Steven Date: Thu, 13 Jun 2024 22:53:28 -0700 Subject: [PATCH] Format code and remove animations --- animation/scene.py | 927 ----------------------------------------- requirements.txt | 1 + research/kuhn/main.py | 358 ++++++++-------- research/leduc/card.py | 16 +- src/README.md | 3 +- src/base.py | 876 ++++++++++++++++++++------------------ src/kuhn.py | 244 +++++------ src/train.py | 506 ++++++++++++---------- 8 files changed, 1083 insertions(+), 1848 deletions(-) delete mode 100644 animation/scene.py diff --git a/animation/scene.py b/animation/scene.py deleted file mode 100644 index 2c99440..0000000 --- a/animation/scene.py +++ /dev/null @@ -1,927 +0,0 @@ -from manim import * -import networkx as nx -""" -Lif of animations I need to make: -- [] Text: Nash Equibrium + Definition -- [] Animation: Rock-Paper-Scissors visualized -- [] Animation: Game tree for chess, animated -- [] Animation: Regret Matching algorithm -- [] Text: Counterfactual Regret Minimization -- [] Animation: actual CFR Algorithm -- [] Animation: Card Abstraction? And Bet Abstraction? -""" - -# Some options of background colors -# config.background_color = "#151213" # Dark brown Gray -# config.background_color = "#262627" # Gray -# config.background_color = "#121212" # Dark Gray -# config.background_color = PURE_GREEN -config.background_color = BLACK -Text.set_default(font='Shadows Into Light', color=BLACK) - - -from manim import * -import sys -from tqdm import tqdm - -sys.path.append("../src") -import base -from base import Player, Action, History, InfoSet -from kuhn import create_history, create_infoSet -import random -from typing import NewType, Dict, List, Callable, cast -import copy - - -class ManimCFR(base.CFR): - """ - I want to see the algorithm in action when I call `solve()` - """ - def __init__(self, create_infoSet, create_history, n_players: int = 2, iterations: int = 100, tracker_interval: int = 10): - super().__init__(create_infoSet, create_history, n_players, iterations, tracker_interval) - -# config.disable_caching = True - -class KuhnPart1(Scene): - def construct(self): - - - background = ImageMobject('assets/poker-table-background.jpeg').scale(2) - # background = ImageMobject('assets/table2.jpeg').scale(2) - self.add(background) - - kuhnPoker = Text("Kuhn Poker", color=WHITE).scale(2.5) - self.play(Write(kuhnPoker)) - - self.wait(2) - self.play(FadeOut(kuhnPoker)) - - ace = ImageMobject('../assets/Ac.png').shift(3*LEFT) - king = ImageMobject('../assets/Kc.png') - queen = ImageMobject('../assets/Qc.png').shift(3*RIGHT) - - self.play(FadeIn(ace, run_time=0.5)) - self.play(FadeIn(king, run_time=0.5)) - self.play(FadeIn(queen, run_time=0.5)) - - self.wait(1) - - # youText = Text("You", color=WHITE, font_size=34).move_to(2*RIGHT) - # opponentText = Tex("Opponent", color=WHITE, font_size=34).move_to(2*DOWN + 2*RIGHT) - # self.play(AnimationGroup(king.animate.shift(2*RIGHT), Write(youText), lag_ratio=0.8)) - # self.wait(0.5) - - # Queen or Ace?? - # self.play(AnimationGroup(AnimationGroup(queen.animate.shift(2*DOWN + 2*RIGHT).scale(1.5), FadeOut(back)), Write(opponentText), lag_ratio=0.8)) - - # ace.scale(0.75).move_to(0.5 * DOWN + 2*RIGHT) - # self.play(AnimationGroup(FadeIn(ace), FadeOut(queen), run_time=0.8)) - # self.play(AnimationGroup(FadeIn(queen), FadeOut(ace), run_time=0.8)) - # self.play(AnimationGroup(FadeIn(ace), FadeOut(queen), run_time=0.8)) - - # unknown = ImageMobject('../assets/unknown.png').scale(0.75).move_to(0.5 * DOWN + 2*RIGHT) - # self.play(AnimationGroup(FadeIn(unknown), FadeOut(ace), run_time=0.8)) - - # self.play(ace.animate.shift(2*DOWN + 2*RIGHT).scale(1.5), queen.animate.shift(2*DOWN + 4*RIGHT).scale(1.5)) - - - -config.background_color = "#1c5d2c" -class Kuhn(MovingCameraScene): - def construct(self): - - - back = ImageMobject('../assets/back.png') - king = ImageMobject('../assets/Kc.png').shift(UP).scale(0.6) - - self.add(king) - - - check = Tex("Check", color=WHITE, font_size=34).move_to(1.5*LEFT +1.2 * DOWN) - bet = Tex("Bet", color=WHITE, font_size=34).move_to(1.5*RIGHT + 1.2 * DOWN) - - check_line = Line(king.get_center() + 0.7 * DOWN, check.get_center(), color=WHITE, buff=0.5) - bet_line = Line(king.get_center() + 0.7 * DOWN, bet.get_center(), color=WHITE, buff=0.5) - - self.play(AnimationGroup(Create(check_line), Write(check), lag_ratio=0.5, run_time=1)) - self.wait(0.3) - self.play(AnimationGroup(Create(bet_line), Write(bet), lag_ratio=0.5, run_time=1)) - - self.wait(3) - left_fifty = Tex("50\%", color=WHITE, font_size=24).move_to(check_line.get_center() + 0.25*UP + 0.25 * LEFT) - right_fifty = Tex("50\%", color=WHITE, font_size=24).move_to(bet_line.get_center() + 0.25 *UP + 0.25 * RIGHT) - self.play(Write(left_fifty), Write(right_fifty)) - self.wait(1) - - # self.play(bet_line.animate.set_stroke_width(7), bet.animate.set_font_size(45)) - unknown = ImageMobject('../assets/unknown.png').shift(UP).scale(0.6) - question_text_left = Tex("?", font_size=30).move_to(left_fifty.get_center()).set_opacity(0.3) - question_text_right = Tex("?", font_size=30).move_to(right_fifty.get_center()) - - - bet_underlined = Underline(bet) - bet_underlined.set_stroke(width=2) - self.play(VGroup(check_line, check, left_fifty).animate.set_opacity(0.3), king.animate.set_fill_opacity(0.3), - self.camera.frame.animate.move_to(bet.get_center()).set(width=10), FadeIn(unknown), - # Transform(left_fifty, question_text_left), Transform(right_fifty, question_text_right), - Write(bet_underlined) - ) - - self.wait(2) - - - call = Tex("Call", color=WHITE, font_size=34).move_to(bet.get_center()+ LEFT +1.3*DOWN) - fold = Tex("Fold", color=WHITE, font_size=34).move_to(bet.get_center()+ RIGHT + 1.3*DOWN) - - call_line = Line(bet.get_center() + 0.1 * DOWN, call.get_center(), color=WHITE, buff=0.4) - fold_line = Line(bet.get_center() + 0.1 * DOWN, fold.get_center(), color=WHITE, buff=0.4) - new_left_fifty = Tex("50\%", color=WHITE, font_size=24).move_to(call_line.get_center() + 0.25*UP + 0.25 * LEFT) - new_right_fifty = Tex("50\%", color=WHITE, font_size=24).move_to(fold_line.get_center() + 0.25 *UP + 0.25 * RIGHT) - self.play(AnimationGroup(Create(call_line), Write(new_left_fifty), Write(call), lag_ratio=0.5, run_time=1)) - self.wait(0.3) - self.play(AnimationGroup(Create(fold_line), Write(new_right_fifty), Write(fold), lag_ratio=0.5, run_time=1)) - - - self.wait(1) - - # self.play(bet_line.animate.set_stroke_width(7), bet.animate.set_font_size(45)) - call_underlined = Underline(call) - call_underlined.set_stroke(width=2) - - bet_underlined.save_state() - self.play(VGroup(new_right_fifty, fold, fold_line).animate.set_opacity(0.3), - Write(call_underlined), - Uncreate(bet_underlined), - self.camera.frame.animate.move_to(call.get_center()).set(width=10) - ) - self.wait(2) - - queen = ImageMobject('../assets/Qc.png').move_to(king.get_center() + 3 *UP).scale(0.6) - qkline = Line(queen.get_center(), king.get_center(), color=WHITE, buff=1) - - opponent = Text("OPPONENT", color=WHITE, font_size=34).move_to(queen.get_center() + 2.5*LEFT) - you = Text("YOU", color=WHITE, font_size=34).move_to(king.get_center() + 2*LEFT) - - self.play(self.camera.frame.animate.move_to(qkline.get_center()).set_width(13), FadeIn(queen, qkline, you, opponent), FadeOut(unknown)) - self.wait(5) - - lose_1 = Tex("(-2)", color=WHITE, font_size=34).move_to(call.get_center()+0.5*DOWN) - lose_2 = Tex("(-1)", color=WHITE, font_size=34).move_to(fold.get_center()+0.5*DOWN) - - self.add(lose_1) - self.play(self.camera.frame.animate.move_to(bet.get_center() + DOWN).set(width=11)) - self.wait(1) - - fold_underlined = Underline(fold) - fold_underlined.set_stroke(width=2) - - self.play(VGroup(call_line, call, new_left_fifty, lose_1).animate.set_opacity(0.3), VGroup(fold_line, fold, new_right_fifty).animate.set_opacity(1), - Uncreate(call_underlined), - Write(fold_underlined), - ) - self.wait(1) - self.play(Write(lose_2)) - self.wait(2) - - - # --- Update to new CFR values --- - new_new_left_fifty = Tex("33\%", color=WHITE, font_size=24).move_to(call_line.get_center() + 0.25*UP + 0.25 * LEFT).set_opacity(0.3) - new_new_right_fifty = Tex("67\%", color=WHITE, font_size=24).move_to(fold_line.get_center() + 0.25 *UP + 0.25 * RIGHT) - - self.play(AnimationGroup(Transform(new_left_fifty, new_new_left_fifty), Transform(new_right_fifty, new_new_right_fifty), run_time=1)) - - - - bet_underlined.restore() - self.wait(2) - self.play(Uncreate(fold_underlined),VGroup(new_right_fifty, fold_line, fold, lose_2).animate.set_opacity(0.3), Create(bet_underlined)) - self.play(VGroup(bet, bet_line, right_fifty).animate.set_opacity(0.3), Uncreate(bet_underlined), - ) - - check_underlined = Underline(check) - check_underlined.set_stroke(width=2) - self.play(Write(check_underlined), VGroup(check, check_line, left_fifty).animate.set_opacity(1), - self.camera.frame.animate.move_to(check.get_center() + DOWN) - - ) - - - - - - - - - bet2 = Tex("Bet", color=WHITE, font_size=34).move_to(check.get_center()+ LEFT +1.3*DOWN) - check2 = Tex("Check", color=WHITE, font_size=34).move_to(check.get_center()+ 0.8 * RIGHT +1.3*DOWN) - - bet2_line = Line(check.get_center() + 0.1 * DOWN, bet2.get_center(), color=WHITE, buff=0.4) - check2_line = Line(check.get_center() + 0.1 * DOWN, check2.get_center(), color=WHITE, buff=0.4) - - call2 = Tex("Call", color=WHITE, font_size=34).move_to(bet2.get_center()+ LEFT +1.5*DOWN) - fold2 = Tex("Fold", color=WHITE, font_size=34).move_to(bet2.get_center()+ RIGHT +1.5*DOWN) - - call2_line = Line(bet2.get_center() + 0.1 * DOWN, call2.get_center(), color=WHITE, buff=0.4) - fold2_line = Line(bet2.get_center() + 0.1 * DOWN, fold2.get_center(), color=WHITE, buff=0.4) - - - self.play(Create(bet2_line), Write(bet2), Create(check2_line), Write(check2)) - self.play(Create(call2_line), Write(call2), Create(fold2_line), Write(fold2)) - self.play(FadeOut(left_fifty, right_fifty, new_left_fifty, new_right_fifty, lose_1, lose_2), - VGroup(bet, bet_line, fold, fold_line).animate.set_opacity(1), - Uncreate(check_underlined), - - ) - - - - self.wait(2) - - king_group = VGroup(check, check_line, bet, bet_line, call, call_line, check2, check2_line, bet2, bet2_line, call2, call2_line, fold, fold_line, fold2, fold2_line, qkline) - - ace_group = king_group.copy().move_to(king_group.get_center() + 6 * LEFT) - queen_group = king_group.copy().move_to(king_group.get_center() + 6 * RIGHT) - - ace_card = ImageMobject('../assets/Ac.png').shift(6*LEFT + UP).scale(0.6) - queen_card = ImageMobject('../assets/Qc.png').shift(6*RIGHT + UP).scale(0.6) - - unknown.move_to(queen.get_center()) - unknown_ace = unknown.copy().shift(6*LEFT) - unknown_queen = unknown.copy().shift(6*RIGHT) - unknown_king = unknown.copy() - - opponent.move_to(unknown_ace.get_center() + 2.5*LEFT) - you.move_to(ace_card.get_center() + 2*LEFT) - - line_misc = Line(unknown_king.get_center(), ace_card.get_center(), color=WHITE, buff=1) - - self.play(self.camera.frame.animate.set(width=23).move_to(king_group.get_center()), - FadeIn(ace_group, queen_group, ace_card, queen_card, unknown_ace, unknown_queen, unknown_king), - - - ) - - - - - - - - - - - - -class CFRScene(Scene): - """ - Explain CFR using Kuhn Poker - ['1', '?'] {'p': 0.6706351182714639, 'b': 0.3293648817285361} - ['?', '3', 'p'] {'p': 0.999997996409566, 'b': 2.0035904340578316e-06} - ['1', '?', 'p', 'b'] {'p': 1.4911118920247454e-06, 'b': 0.9999985088881079} - ['?', '3', 'b'] {'p': 2.0035904340578316e-06, 'b': 0.999997996409566} - ['?', '2', 'p'] {'p': 0.9999980011273641, 'b': 1.9988726358333898e-06} - ['?', '2', 'b'] {'p': 0.6637515887115898, 'b': 0.3362484112884102} - ['3', '?'] {'p': 2.0019138296211177e-06, 'b': 0.9999979980861704} - ['3', '?', 'p', 'b'] {'p': 0.5, 'b': 0.5} - ['2', '?'] {'p': 0.9999867125964585, 'b': 1.3287403541442714e-05} - ['?', '1', 'p'] {'p': 1.9975470122689336e-06, 'b': 0.9999980024529878} - ['2', '?', 'p', 'b'] {'p': 0.9999990009338271, 'b': 9.990661728482387e-07} - ['?', '1', 'b'] {'p': 0.9999980024529878, 'b': 1.9975470122689336e-06} - """ - def construct(self): - """ - - Steps: - 1. Build the entire game tree - 2. Add the information sets - - """ - cfr = ManimCFR(create_infoSet, create_history, iterations=100, tracker_interval=10) - terminal_histories = cfr.solve(method="manim", debug=False) # Get all possible rollouts and use that to build our tree - tracker = cfr.tracker - - - # From player 1 POV - node_positions: Dict[str, np.array] = {} - - def get_position(hist: List[Action]): # Get the node for this particular position - print(f"getting position for {hist}") - player = len(hist) % 2 - position = (ORIGIN + 2 * UP).copy() - if player == 0: - if hist[1] == '1': - position += 2 * LEFT - elif hist[1] == '3': - position += 2* RIGHT - elif hist[1] == '?': - position += DOWN - if hist[0] == '1': - positions += 2 * LEFT - elif hist[0] == '3': - positions += 2* RIGHT - - if len(hist) > 2: - position += DOWN - if hist[2] == 'p': - position += LEFT - else: - position += RIGHT - - if len(hist) > 3: - position += DOWN - if hist[3] == 'p': - position += 0.5 * LEFT - else: - position += 0.5 * RIGHT - - if len(hist) > 4: - position += DOWN - if hist[3] == 'f': - position += 0.5 * LEFT - else: - position += 0.5 * RIGHT - return position - - - for history in terminal_histories: - for i in range(2,len(history.history)): - hist = ''.join(history.history[:i]) - if len(hist) != 0 and hist not in node_positions: - node_positions[hist] = get_position(hist) - - - # infoSets = tracker.tracker_hist[0] - # for infoSet in infoSets.values(): - # print(infoSet.infoSet, infoSet.get_average_strategy()) - print(node_positions) - - for name in node_positions: - self.play(Create(Tex(name[-1], font_size=50).move_to(node_positions[name]))) - # cfr.histories - - - - - - -class NashEquilibriumText(Scene): - def construct(self): - nashEquilibrium = Tex('Nash Equilibrium',font_size=100) - self.play(Write(nashEquilibrium, run_time=1.5)) - d1 = Tex(r'A scenario in game theory in which no') - d2 = Tex(r'players can improve by deviating from') - d3 = Tex(r'their strategy.') - d = VGroup(d1,d2,d3).arrange(direction=DOWN, aligned_edge=LEFT, buff=0.2) - self.play(AnimationGroup(nashEquilibrium.animate.shift(2 * UP), FadeIn(d, run_time=1.5), lag_ratio=1)) - self.wait(2) - self.play(FadeOut(*self.mobjects)) # TODO: Maybe link this together with the next animation - - -class bits(Scene): - def construct(self): - MONOLISA_FONT = 'MonoLisa' - cardsText = Text('52 Cards', font=MONOLISA_FONT) - bits = Text(r'0000000000000000000000000000000000000000000000000', font=MONOLISA_FONT, font_size=28) - self.play(ReplacementTransform(cardsText, bits)) - bits_with_cards = Text(r'0010001000000000000000000000000000000000000000100', font=MONOLISA_FONT, font_size=28) - self.play(ReplacementTransform(bits, bits_with_cards)) - clubs_02 = ImageMobject('../assets/cards/card_clubs_02.png') - self.play(FadeIn(clubs_02)) - # self.play(ReplacementTransform(bits_with_cards, clubs_02)) - - -def create_mobject(choice): - return ImageMobject(f'assets/{choice}.png').scale(0.5) - - -class RPS(Scene): - def construct(self): - background = ImageMobject('assets/background.png').scale(0.4) - self.add(background) - - rock = create_mobject('rock').shift(4.5*LEFT) - paper = create_mobject('paper') - scissors = create_mobject('scissors').shift(4.5*RIGHT) - - self.play(FadeIn(rock, scale=0.5), FadeIn(paper, scale=0.5), FadeIn(scissors, scale=0.5)) - # self.play(ReplacementTransform(rText, rock), ReplacementTransform(pText, paper), ReplacementTransform(sText, scissors)) - - self.wait(1) - - opponentText = Paragraph("Opponent\nChoice", font_size=40, alignment='center').shift(2.7*UP) - arrow = Arrow(ORIGIN, DOWN, buff=0.1, stroke_width=3, color=BLACK).next_to(opponentText, DOWN) - self.play(Write(opponentText), rock.animate.shift(0.5*DOWN), paper.animate.shift(0.5*DOWN), scissors.animate.shift(0.5*DOWN)) - # oval = Ellipse(width=4, height=8, color=BLACK, fill_opacity=0) - highlight = ImageMobject('assets/emphasize.png').scale(0.6).stretch(0.92, 1).shift(0.2*UP).stretch(0.9, 0).scale(0) - self.play(AnimationGroup(Create(arrow), FadeIn(highlight), lag_ratio=0.5)) - group = Group(opponentText, arrow, highlight) - self.play(group.animate.shift(4.5*RIGHT)) - self.wait(0.3) - rock2 = ImageMobject('assets/question.png').scale(0.5).shift(4.5*LEFT).shift(0.5*DOWN) - paper2 = ImageMobject('assets/question.png').scale(0.5).shift(0.5*DOWN) - scissors2 = ImageMobject('assets/question.png').scale(0.5).shift(4.5*RIGHT).shift(0.5*DOWN) - self.play(FadeIn(rock2), FadeIn(paper2), FadeIn(scissors2), group.animate.shift(9*LEFT), FadeOut(scissors), FadeOut(paper), FadeOut(rock)) - self.wait(0.3) - self.play(group.animate.shift(4.5*RIGHT)) - self.wait(0.3) - self.play(group.animate.shift(4.5*LEFT)) - self.wait(0.3) - self.play(group.animate.shift(9*RIGHT)) - self.wait(0.8) - - to_remove = [] - for obj in self.mobjects: - if obj != background: - to_remove.append(obj) - self.play(FadeOut(*to_remove)) - - -class RPS2(Scene): - def construct(self): - background = ImageMobject('assets/background.png').scale(0.4) - self.add(background) - - rock = create_mobject('rock').shift(4*LEFT + 2*UP) - paper = create_mobject('paper').shift(2*UP) - scissors = create_mobject('scissors').shift(4*RIGHT + 2*UP) - - self.play(AnimationGroup(FadeIn(rock, scale=0.4), FadeIn(paper, scale=0.4), FadeIn(scissors, scale=0.4), lag_ratio=0.7)) - self.wait(1) - - rock2 = rock.copy().next_to(scissors, 5*DOWN) - paper2 = paper.copy().next_to(rock, 3*DOWN) - scissors2 = scissors.copy().next_to(paper, 3*DOWN) - - x1 = [rock.get_x(), 1, 0] - x2 = [paper.get_x(), 1, 0] - x3 = [scissors.get_x(), 1, 0] - line1 = Line(x1, x1 + DOWN, color=BLACK) - line2 = Line(x2, x2 + DOWN, color=BLACK) - line3 = Line(x3, x3 + DOWN, color=BLACK) - - line1.add_updater(lambda m: m.put_start_and_end_on([min(rock.get_x(), -0.5), 1, 0], x1 + DOWN)) - line2.add_updater(lambda m: m.put_start_and_end_on([paper.get_x(), 1, 0], x2 + DOWN)) - line3.add_updater(lambda m: m.put_start_and_end_on([max(scissors.get_x(), 0.5), 1, 0], x3 + DOWN)) - - self.play(AnimationGroup(FadeIn(paper2, scale=0.4), FadeIn(scissors2, scale=0.4), FadeIn(rock2, scale=0.4), lag_ratio=0.7), AnimationGroup(Create(line1), Create(line2), Create(line3), lag_ratio=0.7)) - self.wait(1) - - q = ImageMobject('assets/question.png').scale(0.5).shift(2*UP) - self.play(AnimationGroup(AnimationGroup(rock.animate.shift(4*RIGHT), scissors.animate.shift(4*LEFT)), FadeIn(q), lag_ratio=0.3)) - self.play(FadeOut(rock), FadeOut(scissors), FadeOut(paper)) - self.wait(2) - self.play(AnimationGroup(AnimationGroup(FadeOut(line1), FadeOut(line2), FadeOut(line3), FadeOut(q)), AnimationGroup(rock2.animate.shift(2.5*UP), paper2.animate.shift(2.5*UP), scissors2.animate.shift(2.5*UP)), lag_ratio=0.3)) - - one_third_text = Text("1/3", font_size=60).next_to(paper2, 2*DOWN) - one_third_text2 = Text("1/3", font_size=60).next_to(rock2, 2*DOWN) - one_third_text3 = Text("1/3", font_size=60).next_to(scissors2, 2*DOWN) - - self.play(Write(one_third_text), Write(one_third_text2), Write(one_third_text3)) - self.wait(2) - self.play(FadeOut(one_third_text), FadeOut(one_third_text2), FadeOut(one_third_text3), FadeOut(rock2), FadeOut(paper2), FadeOut(scissors2)) - - -# config.background_color = GRAY_BROWN -# Text.set_default(font='Shadows Into Light', color=WHITE) - -class RPSSim(Scene): - """ - When you play randomly - """ - def construct(self): - # TODO: Same code as below - return - -class RPSSimRock(Scene): - """ - When you play rock all the time, you opponent catches on after 10 iterations - """ - def construct(self): - background = ImageMobject('assets/background.png').scale(0.4) - self.add(background) - - - self.wait(2) - rock = create_mobject('rock').shift(UP) - hundred = Text("100%", font_size=100).shift(1.2*DOWN) - self.play(FadeIn(rock), FadeIn(hundred)) - self.wait(2) - - - - - - - plane = NumberPlane( - x_range = (0, 1), - y_range = (0, 1, 0.5), - x_length=4, - y_length=4, - background_line_style={ - "stroke_width": 0 - }, - axis_config={"include_numbers": True, "color": BLACK}, - y_axis_config={"label_direction": LEFT}, - ) - plane.get_x_axis().numbers.set_color(BLACK) - plane.get_y_axis().numbers.set_color(BLACK) - # plane.center() - plane.shift(4*RIGHT) - x_values = [0] - y_values = [0] - line_graph = plane.plot_line_graph( - x_values = x_values, - y_values = y_values, - line_color=GOLD_E, - vertex_dot_radius=0, - stroke_width = 4, - ) - - your_score_tracker = ValueTracker(0) - opponent_score_tracker = ValueTracker(0) - you = Text('You').shift(5*LEFT + 2*UP) - opp= Text('Opponent').shift(LEFT + 2*UP) - your_score_placeholder = Text('0', font_size=80).next_to(you, DOWN) - opponent_score_placeholder = Text('0', font_size=80).next_to(opp, DOWN) - - self.play(rock.animate.move_to(5*LEFT + DOWN), FadeOut(hundred), FadeIn(plane, line_graph, you, opp, your_score_placeholder, opponent_score_placeholder, Text('Win Rate over Time', font_size=30).next_to(plane, UP), Text('Time', font_size=20).next_to(plane, DOWN).shift(2*RIGHT), Text('Win Rate', font_size=20).next_to(plane, LEFT).shift(2*UP))) - self.wait(1) - - - for i in range(1,25): - group = ['rock', 'paper', 'scissors'] - # choice_player = np.random.randint(0,3) - choice_player = 0 - - if i == 1: - choice_opponent = 0 - elif i == 2: - choice_opponent = 1 - elif i == 3: - choice_opponent = 2 - else: - choice_opponent = 1 - - if choice_player == choice_opponent: - text = Text("Tie.", font_size=50) - elif (choice_player - choice_opponent) % 3 == 1: - text = Text("Win!", font_size=50) - your_score_tracker += 1 - else: - text = Text("Loss :(", font_size=50) - opponent_score_tracker += 1 - - x_values.append(i) - if (your_score_tracker.get_value() + opponent_score_tracker.get_value()) == 0: - y_values.append(0) - else: - y_values.append(your_score_tracker.get_value() / (your_score_tracker.get_value() + opponent_score_tracker.get_value())) - - player = create_mobject(group[choice_player]).move_to(5*LEFT + DOWN) - opponent = create_mobject(group[choice_opponent]).move_to(LEFT + DOWN) - - your_score_updated = Text(str(int(your_score_tracker.get_value())), font_size=80).next_to(you, DOWN) - opponent_score_updated = Text(str(int(opponent_score_tracker.get_value())), font_size=80).next_to(opp, DOWN) - - text.shift(DOWN + 3*LEFT) - self.play(AnimationGroup(FadeIn(player), FadeIn(opponent), - ), run_time=0.1) - self.remove(rock) - self.add(text) - self.play(AnimationGroup(your_score_placeholder.animate.become(your_score_updated), opponent_score_placeholder.animate.become(opponent_score_updated), run_time=0.2)) - # self.wait(0.2) - - new_plane = NumberPlane( - x_range = (0, i), - y_range = (0, 1, 0.5), - x_length=4, - y_length=4, - background_line_style={ - "stroke_width": 0 - }, - axis_config={"include_numbers": True, "color": BLACK}, - y_axis_config={"label_direction": LEFT}, - ) - new_plane.get_x_axis().numbers.set_color(BLACK) - new_plane.get_y_axis().numbers.set_color(BLACK) - new_plane.shift(4*RIGHT) - - if i <=3: - run_time = 0.5 - elif i <= 7: - run_time = 0.25 - else: - run_time = 0.05 - self.play(line_graph.animate.become(new_plane.plot_line_graph( - x_values = x_values, - y_values = y_values, - line_color=GOLD_E, - vertex_dot_radius=0, - stroke_width = 4, - )), plane.animate.become(new_plane), run_time=run_time) - - self.play(AnimationGroup(FadeOut(player), FadeOut(opponent), run_time=0.1)) - self.remove(text) - - - - - - - - -class RPSold(Scene): - """ - Not using this because it seems too complicated for no reason. Remember that simplicity is key. - - But actually, game trees are useful. but this is too complicated redo it. - - Maybe use this to say that if you knew what you opponent was going to play, then - """ - def construct(self): - rText = Tex("Rock", font_size=100).shift(3.5*LEFT) - pText = Tex("Paper", font_size=100).shift(0.1 * DOWN, 0.5 * LEFT) - sText = Tex("Scissors", font_size=100).shift(3*RIGHT) - - youText = Tex("You", font_size=50).shift(2*LEFT + UP) - vs = Tex("vs.", font_size=20) - opponentText = Tex("Opponent", font_size=50).shift(2*RIGHT + UP) - - self.play(Write(rText), Write(pText), Write(sText)) - self.play(Transform(rText, youText), Transform(pText, vs), ReplacementTransform(sText, opponentText)) - - rock = create_mobject('rock').shift(2 * LEFT) - self.play(FadeIn(rock)) - q = Text("?").scale(2).shift(2 * RIGHT) - self.play(FadeIn(q)) - - startPos = 2*UP - blueStart = Circle(0.3, color=RED).shift(startPos) - player_text = Tex("Opponent", font_size=36).shift(startPos + 2*RIGHT) - self.play(AnimationGroup(Transform(opponentText, blueStart), Write(player_text), lag_ratio=0.6)) - # This is a "hack" to allow me to create duplicates to transform one node into three nodes - blueStart1 = Circle(0.3, color=RED).move_to(blueStart.get_center()) - blueStart2 = Circle(0.3, color=RED).move_to(blueStart.get_center()) - blueStart3 = Circle(0.3, color=RED).move_to(blueStart.get_center()) - - redRock = Circle(0.3, color=BLUE).move_to(blueStart.get_center()).shift(3*LEFT + 2 * DOWN) - redPaper = Circle(0.3, color=BLUE).move_to(blueStart.get_center()).shift(2 * DOWN) - redScissors = Circle(0.3, color=BLUE).move_to(blueStart.get_center()).shift(3*RIGHT + 2 * DOWN) - - redRPS = [redRock, redPaper, redScissors] - - # Edges between 1st layer and 2nd layer - line1 = Line(LEFT, LEFT) # intialize empty line - line2 = Line(LEFT, LEFT) # intialize empty line - line3 = Line(LEFT, LEFT) # intialize empty line - line1.add_updater(lambda z: z.become(Line(normalize(blueStart1.get_center() - blueStart.get_center()) * 0.3 + blueStart.get_center(), normalize(-blueStart1.get_center() + blueStart.get_center()) * 0.3 + blueStart1.get_center())) if blueStart.point_at_angle(225*DEGREES)[0] > blueStart1.point_at_angle(45*DEGREES)[0] else None) - line2.add_updater(lambda z: z.become(Line(blueStart.point_at_angle(270*DEGREES), blueStart2.point_at_angle(90*DEGREES))) if blueStart.point_at_angle(270*DEGREES)[1] > blueStart2.point_at_angle(90*DEGREES)[1] else None) - line3.add_updater(lambda z: z.become(Line(normalize(blueStart3.get_center() - blueStart.get_center()) * 0.3 + blueStart.get_center(), normalize(-blueStart3.get_center() + blueStart.get_center()) * 0.3 + blueStart3.get_center())) if blueStart.point_at_angle(315*DEGREES)[0] < blueStart3.point_at_angle(135*DEGREES)[0] else None) - self.add(line1, line2, line3) - - # self.play(AnimationGroup(Transform(blueStart1, redRPS[0]), Transform(blueStart2, redRPS[1]), Transform(blueStart3, redRPS[2]))) - # lag_ratio = 0.5 - - opponent_text = Tex("You", font_size=36).move_to(redScissors.get_center()).shift(1.5*RIGHT) - rockVec = redRock.get_center() - blueStart.get_center() - scissorsVec = redScissors.get_center() - blueStart.get_center() - rockText = Tex("Rock", font_size=28).rotate(np.arctan(rockVec[1] /rockVec[0])).move_to(line1.point_from_proportion(0.6) + 0.3 *UP) - paperText = Tex("Paper", font_size=28).rotate(-PI/2).move_to(line2.point_from_proportion(0.5) + 0.23 *RIGHT) - scissorsText = Tex("Scissors", font_size=28).rotate(np.arctan(scissorsVec[1] /scissorsVec[0])).move_to(line3.point_from_proportion(0.6) + 0.3 *UP) - rockText.add_updater(lambda z: z.move_to(line1.point_from_proportion(0.6) + 0.3 *UP)) - paperText.add_updater(lambda z: z.move_to(line2.point_from_proportion(0.5) + 0.23 *RIGHT)) - scissorsText.add_updater(lambda z: z.move_to(line3.point_from_proportion(0.6) + 0.3 *UP)) - # self.play(Write(rockText), Write(paperText), Write(scissorsText),Write(opponent_text)) - self.play(AnimationGroup(AnimationGroup(Transform(blueStart1, redRPS[0]), Transform(blueStart2, redRPS[1]), Transform(blueStart3, redRPS[2])), AnimationGroup(Write(rockText), Write(paperText), Write(scissorsText),Write(opponent_text)), lag_ratio=0.7)) - opponent_text.add_updater(lambda z: z.move_to(redScissors.get_center()).shift(1.5*RIGHT)) - - redRPSDuplicates = [] - for val in redRPS: - redRPSDuplicates.append([Circle(0.3, color=BLUE).move_to(val.get_center()) for _ in range(3)]) - - - flatRedRPSDuplicates = [item for ll in redRPSDuplicates for item in ll] - self.add(*flatRedRPSDuplicates) - redRPSTransforms = [] - for i in range(3): # i == 0 -> Rock, i == 1 -> Paper, i == 2 -> Scissors - redRPSTransforms.append([]) - for j in range(3): - if j == 0: - redRPSTransforms[i].append(Transform(redRPSDuplicates[i][0], Circle(0.3, color=GREY).move_to(redRPS[i].get_center()).shift(LEFT+2*DOWN))) - elif j == 1: - redRPSTransforms[i].append(Transform(redRPSDuplicates[i][1], Circle(0.3, color=GREY).move_to(redRPS[i].get_center()).shift(2*DOWN))) - else: - redRPSTransforms[i].append(Transform(redRPSDuplicates[i][2], Circle(0.3, color=GREY).move_to(redRPS[i].get_center()).shift(RIGHT + 2*DOWN))) - - - lines = [] - lines.append([Line(LEFT, LEFT) for _ in range(3)]) - lines.append([Line(LEFT, LEFT) for _ in range(3)]) - lines.append([Line(LEFT, LEFT) for _ in range(3)]) - lines[0][0].add_updater(lambda z: z.become(Line(redRPS[0].point_at_angle(225*DEGREES), redRPSDuplicates[0][0].point_at_angle(90*DEGREES))) if redRPS[0].point_at_angle(225*DEGREES)[0] > redRPSDuplicates[0][0].point_at_angle(90*DEGREES)[0] else None) - lines[0][1].add_updater(lambda z: z.become(Line(redRPS[0].point_at_angle(270*DEGREES), redRPSDuplicates[0][1].point_at_angle(90*DEGREES))) if redRPS[0].point_at_angle(270*DEGREES)[1] > redRPSDuplicates[0][1].point_at_angle(90*DEGREES)[1] else None) - lines[0][2].add_updater(lambda z: z.become(Line(redRPS[0].point_at_angle(315*DEGREES), redRPSDuplicates[0][2].point_at_angle(90*DEGREES))) if redRPS[0].point_at_angle(315*DEGREES)[0] < redRPSDuplicates[0][2].point_at_angle(90*DEGREES)[0] else None) - lines[1][0].add_updater(lambda z: z.become(Line(redRPS[1].point_at_angle(225*DEGREES), redRPSDuplicates[1][0].point_at_angle(90*DEGREES))) if redRPS[1].point_at_angle(225*DEGREES)[0] > redRPSDuplicates[1][0].point_at_angle(90*DEGREES)[0] else None) - lines[1][1].add_updater(lambda z: z.become(Line(redRPS[1].point_at_angle(270*DEGREES), redRPSDuplicates[1][1].point_at_angle(90*DEGREES))) if redRPS[1].point_at_angle(270*DEGREES)[1] > redRPSDuplicates[1][1].point_at_angle(90*DEGREES)[1] else None) - lines[1][2].add_updater(lambda z: z.become(Line(redRPS[1].point_at_angle(315*DEGREES), redRPSDuplicates[1][2].point_at_angle(90*DEGREES))) if redRPS[1].point_at_angle(315*DEGREES)[0] < redRPSDuplicates[1][2].point_at_angle(90*DEGREES)[0] else None) - lines[2][0].add_updater(lambda z: z.become(Line(redRPS[2].point_at_angle(225*DEGREES), redRPSDuplicates[2][0].point_at_angle(90*DEGREES))) if redRPS[2].point_at_angle(225*DEGREES)[0] > redRPSDuplicates[2][0].point_at_angle(90*DEGREES)[0] else None) - lines[2][1].add_updater(lambda z: z.become(Line(redRPS[2].point_at_angle(270*DEGREES), redRPSDuplicates[2][1].point_at_angle(90*DEGREES))) if redRPS[2].point_at_angle(270*DEGREES)[1] > redRPSDuplicates[2][1].point_at_angle(90*DEGREES)[1] else None) - lines[2][2].add_updater(lambda z: z.become(Line(redRPS[2].point_at_angle(315*DEGREES), redRPSDuplicates[2][2].point_at_angle(90*DEGREES))) if redRPS[2].point_at_angle(315*DEGREES)[0] < redRPSDuplicates[2][2].point_at_angle(90*DEGREES)[0] else None) - - - flatLines = [item for line in lines for item in line] - self.add(*flatLines) - flatRedRPSTransforms = [item for sublist in redRPSTransforms for item in sublist] - end_of_game_text = Tex("End of Game", font_size=28).move_to(redRPS[2].get_center()).shift(2.5*RIGHT + 2*DOWN) - - rockVec = LEFT + 2*DOWN - scissorsVec = RIGHT + 2 * DOWN - rockText1 = Tex("Rock", font_size=20).rotate(np.arctan(rockVec[1] /rockVec[0])).move_to(lines[0][0].point_from_proportion(0.6) + 0.3 *UP) - paperText1 = Tex("Paper", font_size=20).rotate(-PI/2).move_to(lines[0][1].point_from_proportion(0.5) + 0.15 *RIGHT) - scissorsText1 = Tex("Scissors", font_size=20).rotate(np.arctan(scissorsVec[1] /scissorsVec[0])).move_to(lines[0][2].point_from_proportion(0.6) + 0.3 *UP) - rockText2 = Tex("Rock", font_size=20).rotate(np.arctan(rockVec[1] /rockVec[0])).move_to(lines[1][0].point_from_proportion(0.6) + 0.3 *UP) - paperText2 = Tex("Paper", font_size=20).rotate(-PI/2).move_to(lines[1][1].point_from_proportion(0.5) + 0.15 *RIGHT) - scissorsText2 = Tex("Scissors", font_size=20).rotate(np.arctan(scissorsVec[1] /scissorsVec[0])).move_to(lines[1][2].point_from_proportion(0.6) + 0.3 *UP) - rockText3 = Tex("Rock", font_size=20).rotate(np.arctan(rockVec[1] /rockVec[0])).move_to(lines[2][0].point_from_proportion(0.6) + 0.3 *UP) - paperText3 = Tex("Paper", font_size=20).rotate(-PI/2).move_to(lines[2][1].point_from_proportion(0.5) + 0.15 *RIGHT) - scissorsText3 = Tex("Scissors", font_size=20).rotate(np.arctan(scissorsVec[1] /scissorsVec[0])).move_to(lines[2][2].point_from_proportion(0.6) + 0.3 *UP) - rockText1.add_updater(lambda z: z.move_to(lines[0][0].point_from_proportion(0.7) + 0.3 *UP)) - paperText1.add_updater(lambda z: z.move_to(lines[0][1].point_from_proportion(0.5) + 0.15 *RIGHT)) - scissorsText1.add_updater(lambda z: z.move_to(lines[0][2].point_from_proportion(0.7) + 0.3 *UP)) - rockText2.add_updater(lambda z: z.move_to(lines[1][0].point_from_proportion(0.7) + 0.3 *UP)) - paperText2.add_updater(lambda z: z.move_to(lines[1][1].point_from_proportion(0.5) + 0.15 *RIGHT)) - scissorsText2.add_updater(lambda z: z.move_to(lines[1][2].point_from_proportion(0.7) + 0.3 *UP)) - rockText3.add_updater(lambda z: z.move_to(lines[2][0].point_from_proportion(0.7) + 0.3 *UP)) - paperText3.add_updater(lambda z: z.move_to(lines[2][1].point_from_proportion(0.5) + 0.15 *RIGHT)) - scissorsText3.add_updater(lambda z: z.move_to(lines[2][2].point_from_proportion(0.7) + 0.3 *UP)) - - end_of_game_text.add_updater(lambda z: z.move_to(redRPS[2].get_center()).shift(2.5*RIGHT + 2 * DOWN)) - self.play(AnimationGroup(AnimationGroup(*flatRedRPSTransforms), AnimationGroup(Write(end_of_game_text), - Write(rockText1), Write(rockText2), Write(rockText3), Write(paperText1), Write(paperText2), Write(paperText3), Write(scissorsText1), Write(scissorsText2), Write(scissorsText3) - ), lag_ratio=0.7)) - self.wait(1) - # merge the nodes to show that we don't know - """ - Steps: - 1. Replace text with question mark - 2. Move red nodes into one - """ - # update the updaters so the lines can overlap - line1.clear_updaters() - line3.clear_updaters() - line1.add_updater(lambda z: z.become(Line(normalize(blueStart1.get_center() - blueStart.get_center()) * 0.3 + blueStart.get_center(), normalize(-blueStart1.get_center() + blueStart.get_center()) * 0.3 + blueStart1.get_center()))) - line3.add_updater(lambda z: z.become(Line(normalize(blueStart3.get_center() - blueStart.get_center()) * 0.3 + blueStart.get_center(), normalize(-blueStart3.get_center() + blueStart.get_center()) * 0.3 + blueStart3.get_center()))) - - # Remove the characters - self.play(FadeOut(rockText, paperText, scissorsText), - VGroup(redRPSDuplicates[0][0], redRPSDuplicates[0][1], redRPSDuplicates[0][2], redRPS[0], - ).animate.shift(redPaper.get_center() - redRPS[0].get_center()), - VGroup(redRPSDuplicates[2][0], redRPSDuplicates[2][1], redRPSDuplicates[2][2], redRPS[2], - ).animate.shift(redPaper.get_center() - redRPS[2].get_center()), - blueStart1.animate.shift(redPaper.get_center() - blueStart1.get_center()), - blueStart3.animate.shift(redPaper.get_center() - blueStart3.get_center()), - ) - questionMark = Tex('?').next_to(line2, RIGHT) - self.play(Write(questionMark)) - - # self.play(redRock.animate.move_to(redPaper), redScissors.animate.move_to(redPaper)) - - # self.play(Create(bluePaper, blueRock, blueScissors)) - -class badRPS(Scene): - # Idea: Start from drawing out the full game tree, and collapse opponent nodes into one - def construct(self): - G = nx.Graph() - G.add_node("ROOT") - - choices = ["Rock", "Paper", "Scissors"] - players = ["Opponent", "You"] - for player in ["Opponent", "You"]: - for choice in choices: - G.add_node(f'{player}_{choice}') - - if player == "Opponent": - G.add_edge("ROOT", f'{player}_{choice}') - else: - for opp_choice in choices: - G.add_edge(f'Opponent_{opp_choice}', f'{player}_{choice}') - - - gg = Graph(G.nodes, G.edges, root_vertex="ROOT", layout="circular", vertex_config={'radius': 0.2}, labels=True) - self.play(Create(gg)) - # self.play(gg.animate.change_layout("tree", root_vertex="ROOT")) - # self.wait() - -class CFRText(Scene): - def construct(self): - text = Text('Counterfactual Regret').scale(2) - text2 = Text('Minimization (CFR)').scale(2).next_to(text, DOWN) - self.play(Write(text)) - self.play(Write(text2)) - self.wait(2) - self.play(FadeOut(*self.mobjects)) - -class hist(Scene): - """ - Animation for the Histograms for the card abstraction - - Idea: - Start with the cards on each node. - Then, you use a histogram representation. - - - - - """ - def construct(self): - # distributions = np.random.random((10,5)) - values = [0.3,0.5,0.2] - chart = BarChart( - values - ) - self.play(Create(chart)) - - values2 = [0.5,0.2,0.4] - chart2 = BarChart( - values - ) - self.play(Transform(chart, chart2)) - - -class bet(Scene): - """ - Idea: - Have a continuous line, and then bucket these bets into discrete values. - - """ - def construct(self): - text_102 = Tex('102\$', font_size=64) - text_100 = Tex('100\$', font_size=64) - self.play(Transform(text_102, text_100)) - # number_line = NumberLine( - # x_range=[0, 100, 10], - # length=10, - # ) - # check_text = Tex('Check').next_to(number_line.n2p(0), DOWN) - # all_in_text = Tex('All-In').next_to(number_line.n2p(100), DOWN) - # self.play(AnimationGroup(Create(number_line), Write(check_text), Write(all_in_text), lag_ratio=0.3)) - # self.wait(2) - - start_node = Circle(0.3, color=BLUE).shift(2*UP) - target_nodes = VGroup(*[Circle(0.05, color=RED) for _ in range(60)]) - target_nodes.arrange(RIGHT, buff=0.1).shift(2*DOWN) - lines = [] - for i in range(60): - unit_v = normalize(target_nodes[i].get_center() - start_node.get_center()) - lines.append(Line(start_node.get_center() + 0.3 * unit_v, target_nodes[i].get_center() - 0.05 * unit_v, color=GRAY)) - Line(start_node) - self.play(Create(start_node)) - self.play(Create(target_nodes), Create(VGroup(*lines))) - self.wait(2) - - target_nodes_2 = VGroup(*[Circle(0.2, color=RED) for _ in range(10)]) - target_nodes_2.arrange(RIGHT, buff=0.2).shift(2*DOWN) - lines2 = [] - for i in range(10): - unit_v = normalize(target_nodes_2[i].get_center() - start_node.get_center()) - lines2.append(Line(start_node.get_center() + 0.3 * unit_v, target_nodes_2[i].get_center() - 0.2 * unit_v, color=GRAY)) - - self.play(Transform(VGroup(*target_nodes, *lines), VGroup(*target_nodes_2, *lines2))) - - -class valueTemplate(Scene): - def construct(self): - number_line = NumberLine() - pointer = Vector(DOWN).shift(UP) - label = Tex("x") - label.add_updater(lambda m: m.next_to(pointer, UP)) - - tracker = ValueTracker(0) - pointer.add_updater(lambda m: m.next_to( - number_line.n2p(tracker.get_value()), UP) - ) - self.add(number_line, pointer, label) - tracker += 1.5 - self.wait(1) - tracker -= 4 - self.wait(0.5) - self.play(tracker.animate.set_value(5)) - - -# class Histogram(Scene): -# """ -# Create a template for this histogram, which you will be able to recycle in the future: -# """ -# # Add Image -# corona= ImageMobject("assets/img/covid_19.png") -# corona.scale(1.2) -# corona.to_edge(RIGHT, buff=1) - -# self.add(corona) - - - - -class GraphExample(Scene): - def construct(self): - ax = Axes(x_range=[0,5,1], y_range=[0,3,1]) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f3c7525..15f3491 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ matplotlib pot # Not really needed, I found it to be too slow scikit-learn labml + diff --git a/research/kuhn/main.py b/research/kuhn/main.py index cee658c..3b835af 100644 --- a/research/kuhn/main.py +++ b/research/kuhn/main.py @@ -13,6 +13,7 @@ python main.py --play --- """ + from random import shuffle import joblib import numpy as np @@ -24,175 +25,202 @@ PLAYER = 0 AI = 1 + def get_action(strategy): - r = np.random.random() - cumulativeProbability = 0 - action = 0 - for a in range(len(strategy)): - action = a - cumulativeProbability += strategy[a] - if (r < cumulativeProbability): break - - if action == 0: - return 'p' - else: - return 'b' - + r = np.random.random() + cumulativeProbability = 0 + action = 0 + for a in range(len(strategy)): + action = a + cumulativeProbability += strategy[a] + if r < cumulativeProbability: + break + + if action == 0: + return "p" + else: + return "b" + + def get_strategy(card, strategy=0): - """ - strategy=0 -> CFR - stragegy=1 -> Pass if you have 1, Bet if you have 3, and play 50% of your hands with 2 - stragegy=2 -> Always pass - stragegy=3 -> Always bet - """ - if strategy == 0: - return get_action(nodeMap[str(card)].getAverageStrategy()) - elif strategy == 1: - if card == 1: - return 'p' - elif card == 3: - return 'b' - else: - r = np.random.random() - if r <= 0.5: - return 'p' - else: - return 'b' - elif strategy == 2: - return 'p' - elif strategy == 3: - return 'b' + """ + strategy=0 -> CFR + stragegy=1 -> Pass if you have 1, Bet if you have 3, and play 50% of your hands with 2 + stragegy=2 -> Always pass + stragegy=3 -> Always bet + stragegy=4 -> Random (50% pass, 50% bet) + """ + if strategy == 0: + return get_action(nodeMap[str(card)].getAverageStrategy()) + elif strategy == 1: + if card == 1: + return "p" + elif card == 3: + return "b" + else: + r = np.random.random() + if r <= 0.5: + return "p" + else: + return "b" + elif strategy == 2: + return "p" + elif strategy == 3: + return "b" + elif strategy == 4: + choices = ["p", "b"] + shuffle(choices) + return choices[0] def terminal(history): - if (len(history) > 1) and (history[-1] == 'p' or history[-2:] == "bb"): - return True - else: - return False + if (len(history) > 1) and (history[-1] == "p" or history[-2:] == "bb"): + return True + else: + return False + if __name__ == "__main__": - score = [0, 0] # [PLAYER_SCORE, AI_SCORE] - # Load the nodeMap - try: - nodeMap: Node = joblib.load("KuhnNodeMap.joblib") - except: - print("Could not load nodeMap. Please train the model first by running: python main.py") - exit() - - first_player_to_move = 0 - - parser = argparse.ArgumentParser(description='Play Kuhn Poker against the best AI possible.') - parser.add_argument("-p", "--play", - action="store_true", dest="user_input", default=False, - help="Manually play against the AI through the terminal.") - parser.add_argument("-v", "--verbose", - action="store_true", dest="verbose", default=False, - help="Manually play against the AI through the terminal.") - - args = parser.parse_args() - user_input = args.user_input - verbose = args.verbose # In case you want to see each game printed out in the terminal while running the simulation - - user_scores_over_time = [] - opponent_scores_over_time = [] - - cards = [1,2,3] # index 0 is for PLAYER, index 1 is for AI - for _ in range(1000000): - # Setup a new round - history = "" - first_player_to_move += 1 # Alternate players to play each round - first_player_to_move %= 2 - player_to_move = first_player_to_move - shuffle(cards) - - if user_input or verbose: - print("--------------------------") - print("Current Scoreboard:") - print("You: {}, Opponent: {}\n".format(score[0], score[1])) - print("You have been dealt a:", cards[0]) - - # Alternate every round between the players playing first - if player_to_move == PLAYER: - if user_input: # Manual Input - action = input('Please decide whether to pass or bet ("p" or "b"): ') - else: # Get a hardcoded trategy - action = get_strategy(cards[0], 1) - else: - action = get_strategy(cards[1]) - if user_input or verbose: - print("Your opponent has decided to play:", action) - - history += action - - while not terminal(history): - plays = len(history) - player = (player_to_move + plays) % 2 - - if player == PLAYER: - if user_input: - action = input('Please decide whether to pass or bet ("p" or "b"): ') - else: - action = get_strategy(cards[0], 1) - else: - action = get_strategy(cards[1]) - if user_input or verbose: - print("Your opponent has decided to play:", action) - - history += action - - # Return payoff for terminal states - terminalPass = history[-1] == 'p' - doubleBet = history[-2:] == "bb" - isPlayerCardHigher = cards[0] > cards[1] - - - temp_score = [0, 0] - if terminalPass: - if history == "pp": - if isPlayerCardHigher: - temp_score[0] += 1 - temp_score[1] -= 1 - - else: - temp_score[0] -= 1 - temp_score[1] += 1 - else: # Equivalent to folding - temp_score[(first_player_to_move + len(history)) % 2] += 1 - temp_score[(first_player_to_move + len(history) + 1) % 2] -= 1 - - elif doubleBet: - if isPlayerCardHigher: - temp_score[0] += 2 - temp_score[1] -= 2 - - else: - temp_score[0] -= 2 - temp_score[1] += 2 - - if user_input or verbose: - if temp_score[0] > temp_score[1]: - print("Congratulations, you won the round with {} extra chips!\n".format(temp_score[0])) - else: - print("You lost to a {} :( You lose {} chips.\n".format(cards[1], temp_score[1])) - - score[0] += temp_score[0] - score[1] += temp_score[1] - - # Score scores so it can be plotted afterwards - user_scores_over_time.append(score[0]) - opponent_scores_over_time.append(score[1]) - - plt.plot(user_scores_over_time) - plt.plot(opponent_scores_over_time) - if user_input: - plt.legend(['User Strategy', "CFR Strategy"], loc="upper left") - else: - plt.legend(['Deterministic Strategy', "CFR Strategy"], loc="upper left") - plt.xlabel("Number of Rounds") - plt.ylabel("Number of Chips Gained") - # plt.savefig("AI_score_over_time.png", bbox_inches='tight') # Uncomment to save the figure - plt.show() - - - - + score = [0, 0] # [PLAYER_SCORE, AI_SCORE] + # Load the nodeMap + try: + nodeMap: Node = joblib.load("KuhnNodeMap.joblib") + print("NodeMap loaded:") + print("InfoSet - Actions Probability") + for key in nodeMap.keys(): + # print(key, f"{float(nodeMap[key].getAverageStrategy()):.2f}") + print(key, "-", [round(val, 2) for val in nodeMap[key].getAverageStrategy()]) + except: + print("Could not load nodeMap. Please train the model first by running: python main.py") + exit() + + first_player_to_move = 0 + + parser = argparse.ArgumentParser(description="Play Kuhn Poker against the best AI possible.") + parser.add_argument( + "-p", + "--play", + action="store_true", + dest="user_input", + default=False, + help="Manually play against the AI through the terminal.", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + dest="verbose", + default=False, + help="Manually play against the AI through the terminal.", + ) + + args = parser.parse_args() + user_input = args.user_input + verbose = ( + args.verbose + ) # In case you want to see each game printed out in the terminal while running the simulation + + user_scores_over_time = [] + opponent_scores_over_time = [] + + cards = [1, 2, 3] # index 0 is for PLAYER, index 1 is for AI + for _ in range(1000000): + # Setup a new round + history = "" + first_player_to_move += 1 # Alternate players to play each round + first_player_to_move %= 2 + player_to_move = first_player_to_move + shuffle(cards) + + if user_input or verbose: + print("--------------------------") + print("Current Scoreboard:") + print("You: {}, Opponent: {}\n".format(score[0], score[1])) + print("You have been dealt a:", cards[0]) + + # Alternate every round between the players playing first + if player_to_move == PLAYER: + if user_input: # Manual Input + action = input('Please decide whether to pass or bet ("p" or "b"): ') + else: # Get a hardcoded trategy + action = get_strategy(cards[0], 4) + else: + action = get_strategy(cards[1]) + if user_input or verbose: + print("Your opponent has decided to play:", action) + + history += action + + while not terminal(history): + plays = len(history) + player = (player_to_move + plays) % 2 + + if player == PLAYER: + if user_input: + action = input('Please decide whether to pass or bet ("p" or "b"): ') + else: + action = get_strategy(cards[0], 1) + else: + action = get_strategy(cards[1]) + if user_input or verbose: + print("Your opponent has decided to play:", action) + + history += action + + # Return payoff for terminal states + terminalPass = history[-1] == "p" + doubleBet = history[-2:] == "bb" + isPlayerCardHigher = cards[0] > cards[1] + + temp_score = [0, 0] + if terminalPass: + if history == "pp": + if isPlayerCardHigher: + temp_score[0] += 1 + temp_score[1] -= 1 + + else: + temp_score[0] -= 1 + temp_score[1] += 1 + else: # Equivalent to folding + temp_score[(first_player_to_move + len(history)) % 2] += 1 + temp_score[(first_player_to_move + len(history) + 1) % 2] -= 1 + + elif doubleBet: + if isPlayerCardHigher: + temp_score[0] += 2 + temp_score[1] -= 2 + + else: + temp_score[0] -= 2 + temp_score[1] += 2 + + if user_input or verbose: + if temp_score[0] > temp_score[1]: + print( + "Congratulations, you won the round with {} extra chips!\n".format( + temp_score[0] + ) + ) + else: + print("You lost to a {} :( You lose {} chips.\n".format(cards[1], temp_score[1])) + + score[0] += temp_score[0] + score[1] += temp_score[1] + + # Score scores so it can be plotted afterwards + user_scores_over_time.append(score[0]) + opponent_scores_over_time.append(score[1]) + + print(history) + + plt.plot(user_scores_over_time) + plt.plot(opponent_scores_over_time) + if user_input: + plt.legend(["User Strategy", "CFR Strategy"], loc="upper left") + else: + plt.legend(["Deterministic Strategy", "CFR Strategy"], loc="upper left") + plt.xlabel("Number of Rounds") + plt.ylabel("Number of Chips Gained") + # plt.savefig("AI_score_over_time.png", bbox_inches='tight') # Uncomment to save the figure + plt.show() diff --git a/research/leduc/card.py b/research/leduc/card.py index 5594b8a..908a7fa 100644 --- a/research/leduc/card.py +++ b/research/leduc/card.py @@ -1,11 +1,7 @@ class Card: """Inspired from pycfr card.py""" - SUIT_STRING = { - 1: "s", - 2: "h", - 3: "d", - 4: "c" - } + + SUIT_STRING = {1: "s", 2: "h", 3: "d", 4: "c"} CARD_STRING = { 2: "2", 3: "3", @@ -19,14 +15,15 @@ class Card: 11: "J", 12: "Q", 13: "K", - 14: "A" + 14: "A", } + def __init__(self, rank, suit): self.rank = rank self.suit = suit def __repr__(self): - return '{}{}'.format(self.CARD_STRING[self.rank], self.SUIT_STRING[self.suit]) + return "{}{}".format(self.CARD_STRING[self.rank], self.SUIT_STRING[self.suit]) def __eq__(self, card): return card.rank == self.rank @@ -36,6 +33,3 @@ def __lt__(self, card): def __hash__(self): return hash(repr(self)) - - - diff --git a/src/README.md b/src/README.md index 8e6dd9b..f6c1d53 100644 --- a/src/README.md +++ b/src/README.md @@ -5,8 +5,7 @@ Design decisions I am trying to make: - Should I call it node or state? Ultimately node makes more sense from an intuitive level, because it is a node in the game tree. You train strategies on an information set though. You should have infosets in memory, but all possible nodes in memory. -And I -am a very visual person. But ultimately, each node represents a state, so the terms can be used interchangeably. +And I am a very visual person. But ultimately, each node represents a state, so the terms can be used interchangeably. So I don't have explicit state representation, but rather information sets can be visualized. - This is also more efficient for memory right? Actually, you should use states, and optimize with States later. diff --git a/src/base.py b/src/base.py index ee6629b..b56efa6 100644 --- a/src/base.py +++ b/src/base.py @@ -34,429 +34,493 @@ def terminal_utility(i: Player, history: History) -> float: CHANCE = "CHANCE_EVENT" -Player = NewType('Player', int) -Action = NewType('Action', str) - - -class History: - """ - The history includes the information about the set of cards we hold. - - In our 2 player version, Player 0 always is the first to act and player 1 is the second to act. - However, be warned that in a game such as Heads-Up NL Hold'Em, in later game stages, player 1 (big blind) - might be first to act. - """ - def __init__(self, history: List[Action] = []): - self.history = history - - def is_terminal(self): - raise NotImplementedError() - - def actions(self) -> List[Action]: - raise NotImplementedError() - - def player(self) -> Player: - # If chance event, return -1 as described in CFR paper - assert(not self.is_terminal()) - raise NotImplementedError() - - def is_chance(self) -> bool: - return self.player() == -1 - - def sample_chance_outcome(self) -> Action: - # TODO: Determine how to format this API - raise NotImplementedError() - - def terminal_utility(self, i: Player) -> int: - assert(self.is_terminal()) - assert(i in [0, 1]) - - raise NotImplementedError() - - def __add__(self, action: Action): - """ - This should always be something like: - - new_history = HoldemHistory(self.history + [action]) - return new_history +Player = NewType("Player", int) +Action = NewType("Action", str) + + +class History: + """ + The history includes the information about the set of cards we hold. + + In our 2 player version, Player 0 always is the first to act and player 1 is the second to act. + However, be warned that in a game such as Heads-Up NL Hold'Em, in later game stages, player 1 (big blind) + might be first to act. + """ + + def __init__(self, history: List[Action] = []): + self.history = history + + def is_terminal(self): + raise NotImplementedError() + + def actions(self) -> List[Action]: + raise NotImplementedError() + + def player(self) -> Player: + # If chance event, return -1 as described in CFR paper + assert not self.is_terminal() + raise NotImplementedError() + + def is_chance(self) -> bool: + return self.player() == -1 + + def sample_chance_outcome(self) -> Action: + # TODO: Determine how to format this API + raise NotImplementedError() + + def terminal_utility(self, i: Player) -> int: + assert self.is_terminal() + assert i in [0, 1] + + raise NotImplementedError() + + def __add__(self, action: Action): + """ + This should always be something like: + + new_history = HoldemHistory(self.history + [action]) + return new_history + + """ + raise NotImplementedError() + + def get_infoSet_key(self) -> List[Action]: + assert not self.is_chance() # chance history should not be infosets + assert not self.is_terminal() + + raise NotImplementedError() + + def __repr__(self) -> str: + return str(self.history) + - """ - raise NotImplementedError() - - def get_infoSet_key(self) -> List[Action]: - assert(not self.is_chance()) # chance history should not be infosets - assert(not self.is_terminal()) - - raise NotImplementedError() - - def __repr__(self) -> str: - return str(self.history) - - class InfoSet: - """ - Most of the infoset information (actions, player) should be inherited from the history class. - - """ - def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): - self.infoSet = infoSet - self.__actions = actions - self.__player = player - - self.regret = {a: 0 for a in self.actions()} - self.strategy = {a: 0 for a in self.actions()} - self.cumulative_strategy = {a: 0 for a in self.actions()} - self.get_strategy() - assert(1.0 - sum(self.strategy.values()) < 1e-6) - - - def __repr__(self) -> str: - return str(self.infoSet) - - def actions(self) -> List[Action]: - return self.__actions - - def player(self) -> Player: - return self.__player - - def to_dict(self): - return { - 'infoset': self.infoSet, - 'regret': self.regret, - 'cumulative_strategy': self.cumulative_strategy, - } - - def get_strategy(self): - """ - Updates the current strategy based on the current regret, using regret matching - """ - regret = {a: max(r, 0) for a, r in self.regret.items()} - - regret_sum = sum(regret.values()) - - if regret_sum > 0: - self.strategy = {a: r / regret_sum for a, r in regret.items()} - else: - self.strategy = {a: 1/len(self.actions()) for a in self.actions()} - - def get_average_strategy(self): - """ - """ - assert(len(self.actions()) == len(self.cumulative_strategy)) # The cumulative strategy should map a probability for every action + """ + Most of the infoset information (actions, player) should be inherited from the history class. - strategy_sum = sum(self.cumulative_strategy.values()) - - if strategy_sum > 0: - return {a: s / strategy_sum for a, s in self.cumulative_strategy.items()} - else: - return {a: 1/len(self.actions()) for a in self.actions()} - - -class CFR: - def __init__(self, create_infoSet, create_history, n_players: int = 2, iterations: int = 1000000, tracker_interval=50000): - self.n_players = n_players - self.iterations = iterations - self.tracker_interval = tracker_interval - self.infoSets: Dict[str, InfoSet] = {} - self.create_infoSet = create_infoSet - self.create_history = create_history - - self.tracker = InfoSetTracker() - - def get_infoSet(self, history: History) -> InfoSet: - infoSet_key = history.get_infoSet_key() - actions = history.actions() - player = history.player() - - assert(type(infoSet_key) == list) - assert(type(actions) == list) - - infoSet_key_str = ''.join(infoSet_key) - if infoSet_key_str not in self.infoSets: - self.infoSets[infoSet_key_str] = self.create_infoSet(infoSet_key, actions, player) - - return self.infoSets[infoSet_key_str] - - def vanilla_cfr(self, history: History, i: Player, t: int, pi_0: float, pi_1: float, debug=False): # Works for two players - # Return payoff for terminal states - if history.is_terminal(): - if debug: - print(f"history: {history.history} utility: {history.terminal_utility(i)}") - time.sleep(1) - return history.terminal_utility(i) - elif history.is_chance(): - a = history.sample_chance_outcome() # $\sigma_c$ is simply the $f_c$ function I believe... - return self.vanilla_cfr(history + a, i, t, pi_0, pi_1, debug=debug) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this - - infoSet = self.get_infoSet(history) - assert(infoSet.player() == history.player()) - - if debug: - print("infoset", infoSet.to_dict()) - - v = 0 - va = {} + """ - for a in infoSet.actions(): - if history.player() == 0: - va[a] = self.vanilla_cfr(history + a, i, t, infoSet.strategy[a] * pi_0, pi_1, debug=debug) - else: - va[a] = self.vanilla_cfr(history + a, i, t, pi_0, infoSet.strategy[a] * pi_1, debug=debug) + def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): + self.infoSet = infoSet + self.__actions = actions + self.__player = player - v += infoSet.strategy[a] * va[a] - - if history.player() == i: - for a in infoSet.actions(): - infoSet.regret[a] += (pi_1 if i == 0 else pi_0) * (va[a] - v) - # Update cumulative strategy values, this will be used to calculate the average strategy at the end - infoSet.cumulative_strategy[a] += (pi_0 if i == 0 else pi_1) * infoSet.strategy[a] - - # Update regret matching values - infoSet.get_strategy() - - return v - def vanilla_cfr_speedup(self, history: History, t: int, pi_0: float, pi_1: float, debug=False): - """ - We double the speed by updating both player values simultaneously, since this is a zero-sum game. - - """ - # Return payoff for terminal states - if history.is_terminal(): - if debug: - print(history.history, history.terminal_utility(0)) - time.sleep(1) - return history.terminal_utility(0) - elif history.is_chance(): - a = history.sample_chance_outcome() # $\sigma_c$ is simply the $f_c$ function I believe... - return self.vanilla_cfr_speedup(history + a, t, pi_0, pi_1, debug=debug) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this - - infoSet = self.get_infoSet(history) - assert(infoSet.player() == history.player()) - - if debug: - print("infoset", infoSet.to_dict()) - - v = 0 - va = {} + self.regret = {a: 0 for a in self.actions()} + self.strategy = {a: 0 for a in self.actions()} + self.cumulative_strategy = {a: 0 for a in self.actions()} + self.get_strategy() + assert 1.0 - sum(self.strategy.values()) < 1e-6 - for a in infoSet.actions(): - if history.player() == 0: - va[a] = - self.vanilla_cfr_speedup(history + a, t, infoSet.strategy[a] * pi_0, pi_1, debug=debug) - else: - va[a] = - self.vanilla_cfr_speedup(history + a, t, pi_0, infoSet.strategy[a] * pi_1, debug=debug) + def __repr__(self) -> str: + return str(self.infoSet) - v += infoSet.strategy[a] * va[a] - - for a in infoSet.actions(): - infoSet.regret[a] += (pi_1 if history.player() == 0 else pi_0) * (va[a] - v) - # Update cumulative strategy values, this will be used to calculate the average strategy at the end - infoSet.cumulative_strategy[a] += (pi_0 if history.player() == 0 else pi_1) * infoSet.strategy[a] - - # Update regret matching values - infoSet.get_strategy() - - return v - - def vanilla_cfr_manim(self, history: History, i: Player, t: int, pi_0: float, pi_1: float, histories: List[History]): - # Return payoff for terminal states - if history.is_terminal(): - histories.append(history) - return history.terminal_utility(i) - elif history.is_chance(): - a = history.sample_chance_outcome() # $\sigma_c$ is simply the $f_c$ function I believe... - return self.vanilla_cfr_manim(history + a, i, t, pi_0, pi_1, histories) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this - - infoSet = self.get_infoSet(history) - assert(infoSet.player() == history.player()) - - - v = 0 - va = {} + def actions(self) -> List[Action]: + return self.__actions - for a in infoSet.actions(): - if history.player() == 0: - va[a] = self.vanilla_cfr_manim(history + a, i, t, infoSet.strategy[a] * pi_0, pi_1, histories) - else: - va[a] = self.vanilla_cfr_manim(history + a, i, t, pi_0, infoSet.strategy[a] * pi_1, histories) + def player(self) -> Player: + return self.__player - v += infoSet.strategy[a] * va[a] - - if history.player() == i: - for a in infoSet.actions(): - infoSet.regret[a] += (pi_1 if i == 0 else pi_0) * (va[a] - v) - # Update cumulative strategy values, this will be used to calculate the average strategy at the end - infoSet.cumulative_strategy[a] += (pi_0 if i == 0 else pi_1) * infoSet.strategy[a] - - # Update regret matching values - infoSet.get_strategy() - - return v - def mccfr(self, history: History, i: Player, t: int, pi_0: float, pi_1: float, debug=False): # Works for two players - return - - - def solve(self, method='vanilla_speedup', debug=False): - util_0 = 0 - util_1 = 0 - if method == 'manim': - histories = [] - - for t in tqdm(range(self.iterations), desc = "CFR Training Loop"): - if method == 'vanilla_speedup': - util_0 += self.vanilla_cfr_speedup(self.create_history(), t, 1, 1, debug=debug) - - elif method == 'manim' and t < 10: - for player in range(self.n_players): - if player == 0: - util_0 += self.vanilla_cfr_manim(self.create_history(), player, t, 1, 1, histories) - else: - util_1 += self.vanilla_cfr_manim(self.create_history(), player, t, 1, 1, histories) - - print(histories) - - else: # vanilla - for player in range(self.n_players): # This is the slower way, we can speed by updating both players - if player == 0: - util_0 += self.vanilla_cfr(self.create_history(), player, t, 1, 1, debug=debug) - else: - util_1 += self.vanilla_cfr(self.create_history(), player, t, 1, 1, debug=debug) - - if ((t + 1) % self.tracker_interval == 0): - print("Average game value player 0: ", util_0/t) - print("Average game value player 1: ", util_1/t) - self.tracker(self.infoSets) - self.tracker.pprint() - - if method == 'manim': - return histories - - def export_infoSets(self): - joblib.dump(self.infoSets, "holdem_infoSets.joblib") - - - def get_expected_value(self, history: History, player: Player, player_strategy=None, opp_strategy=None): - """ - We can compute the expected values of two strategies. If none, then we will - play both according to the nash equilibrium strategies we computed. + def to_dict(self): + return { + "infoset": self.infoSet, + "regret": self.regret, + "cumulative_strategy": self.cumulative_strategy, + } - However, Getting the expected value this way is not feasible for super large games such as - no-limit texas hold'em, which is why we can compute an approximate EV (see function below). - - This is also known as the expected payoff, or utility function of a strategy profile $u_i(\sigma)$ - """ - # the counterfactual value is simply the averaged utilities possible - if history.is_terminal(): - return history.terminal_utility(player) - else: - infoSet = self.get_infoSet(history) - - if history.player() == player: - if player_strategy is not None: - average_strategy = player_strategy - else: - average_strategy = infoSet.get_average_strategy() - else: - if opp_strategy is not None: - average_strategy = opp_strategy - else: - average_strategy = infoSet.get_average_strategy() - - ev = 0 - for idx, a in enumerate(infoSet.actions()): - value = self.get_expected_value(history + a, player, player_strategy, opp_strategy) - ev += average_strategy[idx] * value - - return ev - - def get_expected_value_approx(self, history: History, player: Player): - # Getting the expected value this way is not feasible for super large games because the branching factor is too big - if history.is_terminal(): - return history.terminal_utility(player) - else: - infoSet = self.get_infoSet(history) - - average_strategy = infoSet.get_average_strategy() - ev = 0 - for a in infoSet.actions(): - value = self.get_expected_value(history + a, (player + 1) % 2) - ev += average_strategy[a] * value - - return ev - - def get_best_response(self, history: History, player: Player, player_strategy=None): - """ - TODO: This only works when the action space is constant throughout. We need something more customized. - - A best response is deterministic. It is a strategy profile that chooses the action that maximizes - its expected value. - - If player_strategy is provided, it will be computed from the player's strategy profile. - - Else, calculate the best response from the nash equilibrium. - - Cannot be a terminal history. - - returns the action index with the lowest EV (This is what the opponent should play). - We do this by playing all actions with equal probability. - + def get_strategy(self): + """ + Updates the current strategy based on the current regret, using regret matching + """ + regret = {a: max(r, 0) for a, r in self.regret.items()} - returns (action_idx, action_ev) - """ - assert(not history.is_terminal()) - assert(history.player() == player) # it should be the player's turn - - infoSet = self.get_infoSet(history) - - ev = [] - if player_strategy: - average_strategy = player_strategy - else: - average_strategy = infoSet.get_average_strategy() # Use the strategy computed if no strategy provided + regret_sum = sum(regret.values()) - # Find the action that is a best response (gives the lowest EV) to player i's strategy profile - - """ + if regret_sum > 0: + self.strategy = {a: r / regret_sum for a, r in regret.items()} + else: + self.strategy = {a: 1 / len(self.actions()) for a in self.actions()} + + def get_average_strategy(self): + """ """ + assert len(self.actions()) == len( + self.cumulative_strategy + ) # The cumulative strategy should map a probability for every action + + strategy_sum = sum(self.cumulative_strategy.values()) + + if strategy_sum > 0: + return {a: s / strategy_sum for a, s in self.cumulative_strategy.items()} + else: + return {a: 1 / len(self.actions()) for a in self.actions()} + + +class CFR: + def __init__( + self, + create_infoSet, + create_history, + n_players: int = 2, + iterations: int = 1000000, + tracker_interval=50000, + ): + self.n_players = n_players + self.iterations = iterations + self.tracker_interval = tracker_interval + self.infoSets: Dict[str, InfoSet] = {} + self.create_infoSet = create_infoSet + self.create_history = create_history + + self.tracker = InfoSetTracker() + + def get_infoSet(self, history: History) -> InfoSet: + infoSet_key = history.get_infoSet_key() + actions = history.actions() + player = history.player() + + assert type(infoSet_key) == list + assert type(actions) == list + + infoSet_key_str = "".join(infoSet_key) + if infoSet_key_str not in self.infoSets: + self.infoSets[infoSet_key_str] = self.create_infoSet(infoSet_key, actions, player) + + return self.infoSets[infoSet_key_str] + + def vanilla_cfr( + self, history: History, i: Player, t: int, pi_0: float, pi_1: float, debug=False + ): # Works for two players + # Return payoff for terminal states + if history.is_terminal(): + if debug: + print(f"history: {history.history} utility: {history.terminal_utility(i)}") + time.sleep(1) + return history.terminal_utility(i) + elif history.is_chance(): + a = ( + history.sample_chance_outcome() + ) # $\sigma_c$ is simply the $f_c$ function I believe... + return self.vanilla_cfr( + history + a, i, t, pi_0, pi_1, debug=debug + ) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this + + infoSet = self.get_infoSet(history) + assert infoSet.player() == history.player() + + if debug: + print("infoset", infoSet.to_dict()) + + v = 0 + va = {} + + for a in infoSet.actions(): + if history.player() == 0: + va[a] = self.vanilla_cfr( + history + a, i, t, infoSet.strategy[a] * pi_0, pi_1, debug=debug + ) + else: + va[a] = self.vanilla_cfr( + history + a, i, t, pi_0, infoSet.strategy[a] * pi_1, debug=debug + ) + + v += infoSet.strategy[a] * va[a] + + if history.player() == i: + for a in infoSet.actions(): + infoSet.regret[a] += (pi_1 if i == 0 else pi_0) * (va[a] - v) + # Update cumulative strategy values, this will be used to calculate the average strategy at the end + infoSet.cumulative_strategy[a] += (pi_0 if i == 0 else pi_1) * infoSet.strategy[a] + + # Update regret matching values + infoSet.get_strategy() + + return v + + def vanilla_cfr_speedup(self, history: History, t: int, pi_0: float, pi_1: float, debug=False): + """ + We double the speed by updating both player values simultaneously, since this is a zero-sum game. + + """ + # Return payoff for terminal states + if history.is_terminal(): + if debug: + print(history.history, history.terminal_utility(0)) + time.sleep(1) + return history.terminal_utility(0) + elif history.is_chance(): + a = ( + history.sample_chance_outcome() + ) # $\sigma_c$ is simply the $f_c$ function I believe... + return self.vanilla_cfr_speedup( + history + a, t, pi_0, pi_1, debug=debug + ) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this + + infoSet = self.get_infoSet(history) + assert infoSet.player() == history.player() + + if debug: + print("infoset", infoSet.to_dict()) + + v = 0 + va = {} + + for a in infoSet.actions(): + if history.player() == 0: + va[a] = -self.vanilla_cfr_speedup( + history + a, t, infoSet.strategy[a] * pi_0, pi_1, debug=debug + ) + else: + va[a] = -self.vanilla_cfr_speedup( + history + a, t, pi_0, infoSet.strategy[a] * pi_1, debug=debug + ) + + v += infoSet.strategy[a] * va[a] + + for a in infoSet.actions(): + infoSet.regret[a] += (pi_1 if history.player() == 0 else pi_0) * (va[a] - v) + # Update cumulative strategy values, this will be used to calculate the average strategy at the end + infoSet.cumulative_strategy[a] += ( + pi_0 if history.player() == 0 else pi_1 + ) * infoSet.strategy[a] + + # Update regret matching values + infoSet.get_strategy() + + return v + + def vanilla_cfr_manim( + self, + history: History, + i: Player, + t: int, + pi_0: float, + pi_1: float, + histories: List[History], + ): + # Return payoff for terminal states + if history.is_terminal(): + histories.append(history) + return history.terminal_utility(i) + elif history.is_chance(): + a = ( + history.sample_chance_outcome() + ) # $\sigma_c$ is simply the $f_c$ function I believe... + return self.vanilla_cfr_manim( + history + a, i, t, pi_0, pi_1, histories + ) # Since it is a chance outcome, the player does not change .. TODO: Check logic for this + + infoSet = self.get_infoSet(history) + assert infoSet.player() == history.player() + + v = 0 + va = {} + + for a in infoSet.actions(): + if history.player() == 0: + va[a] = self.vanilla_cfr_manim( + history + a, i, t, infoSet.strategy[a] * pi_0, pi_1, histories + ) + else: + va[a] = self.vanilla_cfr_manim( + history + a, i, t, pi_0, infoSet.strategy[a] * pi_1, histories + ) + + v += infoSet.strategy[a] * va[a] + + if history.player() == i: + for a in infoSet.actions(): + infoSet.regret[a] += (pi_1 if i == 0 else pi_0) * (va[a] - v) + # Update cumulative strategy values, this will be used to calculate the average strategy at the end + infoSet.cumulative_strategy[a] += (pi_0 if i == 0 else pi_1) * infoSet.strategy[a] + + # Update regret matching values + infoSet.get_strategy() + + return v + + def mccfr( + self, history: History, i: Player, t: int, pi_0: float, pi_1: float, debug=False + ): # Works for two players + return + + def solve(self, method="vanilla_speedup", debug=False): + util_0 = 0 + util_1 = 0 + if method == "manim": + histories = [] + + for t in tqdm(range(self.iterations), desc="CFR Training Loop"): + if method == "vanilla_speedup": + util_0 += self.vanilla_cfr_speedup(self.create_history(), t, 1, 1, debug=debug) + + elif method == "manim" and t < 10: + for player in range(self.n_players): + if player == 0: + util_0 += self.vanilla_cfr_manim( + self.create_history(), player, t, 1, 1, histories + ) + else: + util_1 += self.vanilla_cfr_manim( + self.create_history(), player, t, 1, 1, histories + ) + + print(histories) + + else: # vanilla + for player in range( + self.n_players + ): # This is the slower way, we can speed by updating both players + if player == 0: + util_0 += self.vanilla_cfr( + self.create_history(), player, t, 1, 1, debug=debug + ) + else: + util_1 += self.vanilla_cfr( + self.create_history(), player, t, 1, 1, debug=debug + ) + + if (t + 1) % self.tracker_interval == 0: + print("Average game value player 0: ", util_0 / t) + print("Average game value player 1: ", util_1 / t) + self.tracker(self.infoSets) + self.tracker.pprint() + + if method == "manim": + return histories + + def export_infoSets(self): + joblib.dump(self.infoSets, "holdem_infoSets.joblib") + + def get_expected_value( + self, history: History, player: Player, player_strategy=None, opp_strategy=None + ): + """ + We can compute the expected values of two strategies. If none, then we will + play both according to the nash equilibrium strategies we computed. + + However, Getting the expected value this way is not feasible for super large games such as + no-limit texas hold'em, which is why we can compute an approximate EV (see function below). + + This is also known as the expected payoff, or utility function of a strategy profile $u_i(\sigma)$ + """ + # the counterfactual value is simply the averaged utilities possible + if history.is_terminal(): + return history.terminal_utility(player) + else: + infoSet = self.get_infoSet(history) + + if history.player() == player: + if player_strategy is not None: + average_strategy = player_strategy + else: + average_strategy = infoSet.get_average_strategy() + else: + if opp_strategy is not None: + average_strategy = opp_strategy + else: + average_strategy = infoSet.get_average_strategy() + + ev = 0 + for idx, a in enumerate(infoSet.actions()): + value = self.get_expected_value(history + a, player, player_strategy, opp_strategy) + ev += average_strategy[idx] * value + + return ev + + def get_expected_value_approx(self, history: History, player: Player): + # Getting the expected value this way is not feasible for super large games because the branching factor is too big + if history.is_terminal(): + return history.terminal_utility(player) + else: + infoSet = self.get_infoSet(history) + + average_strategy = infoSet.get_average_strategy() + ev = 0 + for a in infoSet.actions(): + value = self.get_expected_value(history + a, (player + 1) % 2) + ev += average_strategy[a] * value + + return ev + + def get_best_response(self, history: History, player: Player, player_strategy=None): + """ + TODO: This only works when the action space is constant throughout. We need something more customized. + + A best response is deterministic. It is a strategy profile that chooses the action that maximizes + its expected value. + + If player_strategy is provided, it will be computed from the player's strategy profile. + + Else, calculate the best response from the nash equilibrium. + + Cannot be a terminal history. + + returns the action index with the lowest EV (This is what the opponent should play). + We do this by playing all actions with equal probability. + + + returns (action_idx, action_ev) + """ + assert not history.is_terminal() + assert history.player() == player # it should be the player's turn + + infoSet = self.get_infoSet(history) + + ev = [] + if player_strategy: + average_strategy = player_strategy + else: + average_strategy = ( + infoSet.get_average_strategy() + ) # Use the strategy computed if no strategy provided + + # Find the action that is a best response (gives the lowest EV) to player i's strategy profile + + """ The algorithm is the following: 1. For each action that the opponent can play after our decision, we see what happens if our opponent sticks to that strategy. Whatever action our opponent chooses that minimizes our expected value is the one. """ - sample_a = infoSet.actions()[0] - sample_opp_history = history + sample_a # TODO: This does not work for Hold'EM? Because some actions, like folding or calling, end the current game stage, and then it's our turn again - sample_opp_infoSet = self.get_infoSet(sample_opp_history) - opp_actions = sample_opp_infoSet.actions() - for opp_idx, opp_action in enumerate(opp_actions): - # Create deterministic opponent strategy - opp_strategy = np.zeros(len(opp_actions)) - opp_strategy[opp_idx] = 1.0 - ev_opp_action = 0 - for idx, a in enumerate(infoSet.actions()): - value = self.get_expected_value(history + a, player, average_strategy, opp_strategy=opp_strategy) - ev_opp_action += average_strategy[idx] * value - print(ev_opp_action) - - ev.append(ev_opp_action) - - br_action = np.argmin(ev) - return br_action, min(ev) + sample_a = infoSet.actions()[0] + sample_opp_history = ( + history + sample_a + ) # TODO: This does not work for Hold'EM? Because some actions, like folding or calling, end the current game stage, and then it's our turn again + sample_opp_infoSet = self.get_infoSet(sample_opp_history) + opp_actions = sample_opp_infoSet.actions() + for opp_idx, opp_action in enumerate(opp_actions): + # Create deterministic opponent strategy + opp_strategy = np.zeros(len(opp_actions)) + opp_strategy[opp_idx] = 1.0 + ev_opp_action = 0 + for idx, a in enumerate(infoSet.actions()): + value = self.get_expected_value( + history + a, player, average_strategy, opp_strategy=opp_strategy + ) + ev_opp_action += average_strategy[idx] * value + print(ev_opp_action) + + ev.append(ev_opp_action) + + br_action = np.argmin(ev) + return br_action, min(ev) class InfoSetTracker: - """ - We also want to use this to track exploitability - """ - def __init__(self): - self.tracker_hist = [] - self.exploitability: Dict[int: float] = {} # A dictionary of exploitability for index - # tracker.set_histogram(f'strategy.*') - # tracker.set_histogram(f'average_strategy.*') - # tracker.set_histogram(f'regret.*') - - def __call__(self, infoSets: Dict[str, InfoSet]): - self.tracker_hist.append(infoSets) - - def pprint(self): - infoSets = self.tracker_hist[-1] - for infoSet in infoSets.values(): - print(infoSet.infoSet, infoSet.get_average_strategy()) - + """ + We also want to use this to track exploitability + """ + + def __init__(self): + self.tracker_hist = [] + self.exploitability: Dict[int:float] = {} # A dictionary of exploitability for index + # tracker.set_histogram(f'strategy.*') + # tracker.set_histogram(f'average_strategy.*') + # tracker.set_histogram(f'regret.*') + + def __call__(self, infoSets: Dict[str, InfoSet]): + self.tracker_hist.append(infoSets) + + def pprint(self): + infoSets = self.tracker_hist[-1] + for infoSet in infoSets.values(): + print(infoSet.infoSet, infoSet.get_average_strategy()) diff --git a/src/kuhn.py b/src/kuhn.py index f032edf..d2e42ca 100644 --- a/src/kuhn.py +++ b/src/kuhn.py @@ -4,130 +4,136 @@ from typing import NewType, Dict, List, Callable, cast import copy + class KuhnHistory(base.History): - """ - Example of history: - First two actions are the cards dealt to the players. The rest of the actions are the actions taken by the players. - 1. ['1', '2', 'b', 'p', 'b', 'p'] - 2. ['2', '3', 'p', 'p'] - - """ - def __init__(self, history: List[Action] = []): - super().__init__(history) - - def is_terminal(self): - plays = len(self.history) - - if plays > 3: - player_card = self.history[0] - opponent_card = self.history[1] - - assert(player_card in ['1', '2', '3']) - assert(opponent_card in ['1', '2', '3']) - assert(player_card != opponent_card) - - terminalPass = self.history[-1] == 'p' - doubleBet = self.history[-2:] == ['b', 'b'] - - if terminalPass or doubleBet: - return True - - else: - return False - - def actions(self): - if self.is_chance(): - if len(self.history) == 0: - return ['1', '2', '3'] - else: - cards = ['1', '2', '3'] - cards.remove(self.history[0]) # Two players cannot get the same cards - return cards - - elif not self.is_terminal(): - return ['p', 'b'] - - else: - raise Exception("No actions available for terminal history") - - def player(self): - plays = len(self.history) - if plays <= 1: - return -1 - else: - return plays % 2 - - def is_chance(self): - return super().is_chance() - - def sample_chance_outcome(self): - assert(self.is_chance()) - - cards = self.actions() - return random.choice(cards) # Sample one of the cards with equal probability - - def terminal_utility(self, i: Player) -> int: - assert(self.is_terminal()) # We can only call the utility for a terminal history - assert(i in [0, 1]) # Only works for 2 player games for now - - terminalPass = self.history[-1] == 'p' - doubleBet = self.history[-2:] == ['b', 'b'] - - player_card = self.history[i % 2] - opponent_card = self.history[(i + 1) % 2] - is_player_winner = player_card > opponent_card - - if terminalPass: - if self.history[-2:] == ['p', 'p']: - return 1 if is_player_winner else -1 - else: - if len(self.history) % 2 == 0: # i.e.: bp - return 1 if i == 0 else -1 - else: # i.e.: pbp - return 1 if i == 1 else -1 - else: - return 2 if is_player_winner else -2 - - def __add__(self, action: Action): - new_history = KuhnHistory(self.history + [action]) - return new_history - - def get_infoSet_key(self) -> List[Action]: - assert(not self.is_chance()) # chance history should not be infosets - assert(not self.is_terminal()) # terminal history is not an infoset - - player = self.player() - if player == 0: - history = copy.deepcopy(self.history) - history[1] = '?' # Unknown card - return history - else: - history = copy.deepcopy(self.history) - history[0] = '?' # Unknown card - return history - + """ + Example of history: + First two actions are the cards dealt to the players. The rest of the actions are the actions taken by the players. + 1. ['1', '2', 'b', 'p', 'b', 'p'] + 2. ['2', '3', 'p', 'p'] + + """ + + def __init__(self, history: List[Action] = []): + super().__init__(history) + + def is_terminal(self): + plays = len(self.history) + + if plays > 3: + player_card = self.history[0] + opponent_card = self.history[1] + + assert player_card in ["1", "2", "3"] + assert opponent_card in ["1", "2", "3"] + assert player_card != opponent_card + + terminalPass = self.history[-1] == "p" + doubleBet = self.history[-2:] == ["b", "b"] + + if terminalPass or doubleBet: + return True + + else: + return False + + def actions(self): + if self.is_chance(): + if len(self.history) == 0: + return ["1", "2", "3"] + else: + cards = ["1", "2", "3"] + cards.remove(self.history[0]) # Two players cannot get the same cards + return cards + + elif not self.is_terminal(): + return ["p", "b"] + + else: + raise Exception("No actions available for terminal history") + + def player(self): + plays = len(self.history) + if plays <= 1: + return -1 + else: + return plays % 2 + + def is_chance(self): + return super().is_chance() + + def sample_chance_outcome(self): + assert self.is_chance() + + cards = self.actions() + return random.choice(cards) # Sample one of the cards with equal probability + + def terminal_utility(self, i: Player) -> int: + assert self.is_terminal() # We can only call the utility for a terminal history + assert i in [0, 1] # Only works for 2 player games for now + + terminalPass = self.history[-1] == "p" + doubleBet = self.history[-2:] == ["b", "b"] + + player_card = self.history[i % 2] + opponent_card = self.history[(i + 1) % 2] + is_player_winner = player_card > opponent_card + + if terminalPass: + if self.history[-2:] == ["p", "p"]: + return 1 if is_player_winner else -1 + else: + if len(self.history) % 2 == 0: # i.e.: bp + return 1 if i == 0 else -1 + else: # i.e.: pbp + return 1 if i == 1 else -1 + else: + return 2 if is_player_winner else -2 + + def __add__(self, action: Action): + new_history = KuhnHistory(self.history + [action]) + return new_history + + def get_infoSet_key(self) -> List[Action]: + assert not self.is_chance() # chance history should not be infosets + assert not self.is_terminal() # terminal history is not an infoset + + player = self.player() + if player == 0: + history = copy.deepcopy(self.history) + history[1] = "?" # Unknown card + return history + else: + history = copy.deepcopy(self.history) + history[0] = "?" # Unknown card + return history + class KuhnInfoSet(base.InfoSet): - """ - Information Sets (InfoSets) cannot be chance histories, nor terminal histories. - This condition is checked when infosets are created. - - """ - def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): - assert(len(infoSet) >= 2) - super().__init__(infoSet, actions, player) + """ + Information Sets (InfoSets) cannot be chance histories, nor terminal histories. + This condition is checked when infosets are created. + + """ + + def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): + assert len(infoSet) >= 2 + super().__init__(infoSet, actions, player) + def create_infoSet(infoSet_key: List[Action], actions: List[Action], player: Player): - """ - We create an information set from a history. - """ - return KuhnInfoSet(infoSet_key, actions, player) - - + """ + We create an information set from a history. + """ + return KuhnInfoSet(infoSet_key, actions, player) + + def create_history(): - return KuhnHistory() - + return KuhnHistory() + if __name__ == "__main__": - cfr = base.CFR(create_infoSet, create_history) - cfr.solve() \ No newline at end of file + cfr = base.CFR(create_infoSet, create_history) + cfr.solve() + # TODO: Add playing option, right now there is old code in research/kuhn, + # which is not oop diff --git a/src/train.py b/src/train.py index 6ad467e..7cfad06 100644 --- a/src/train.py +++ b/src/train.py @@ -1,4 +1,3 @@ - """ Before looking at this code, you should probably familiarize with a simpler implementation of CFR with Kuhn Poker, under `research/kuhn/train.py`. That version of the game @@ -7,6 +6,7 @@ This is the main training file for Poker CFR. It generates a blueprint strategy. Then, we will improve it in real-time by implementing depth-limited solving. """ + from typing import List from evaluator import Deck, CombinedHand, Evaluator, Card import numpy as np @@ -21,230 +21,300 @@ 2) Check/Call 3) Bet/Raise """ -NUM_ACTIONS = 3 +NUM_ACTIONS = 3 nodeMap = {} startIterations = 0 + class Node: - def __init__(self) -> None: - self.infoSet = "" - self.regretSum = np.zeros(NUM_ACTIONS) - self.strategy = np.zeros(NUM_ACTIONS) - self.strategySum = np.zeros(NUM_ACTIONS) - - def describe(self): - print("Infoset: {} -> Strategy at this infoset: {}, RegretSum: {}".format(self.infoSet, np.around(self.getAverageStrategy(), 2), self.regretSum.sum())) - - def getStrategy(self, realization_weight): - for a in range(NUM_ACTIONS): - self.strategy[a] = max(0, self.regretSum[a]) - - normalizingSum = self.strategy.sum() - for a in range(NUM_ACTIONS): - if (normalizingSum > 0): - self.strategy[a] /= normalizingSum - else: - self.strategy[a] = 1 / NUM_ACTIONS - - self.strategySum[a] += realization_weight * self.strategy[a] - - return self.strategy - - def getAverageStrategy(self): - normalizingSum = self.strategySum.sum() - avgStrategy = np.zeros(NUM_ACTIONS) - for a in range(NUM_ACTIONS): - if (normalizingSum > 0): - avgStrategy[a] = self.strategySum[a] / normalizingSum - else: - avgStrategy[a] = 1 / NUM_ACTIONS - - return avgStrategy - - -class History(): - # SB bet size = 1, BB bet size = 2 - def __init__(self): - self.total_pot_size = 0 - self.history_str = "" - self.min_bet_size = 2 - self.game_stage = 2 - self.curr_round_plays = 0 # if self.curr_round_plays == 0 and we check, then we DON'T move to the next game stage - -all_history = [] # Global variable to store all histories -def cfr(all_community_cards: List[Card], private_cards: List[CombinedHand], community_cards: CombinedHand, history: History, p0, p1): - """ - player_cards: [user_cards, opponent_cards] - community_cards: "" for community (board) cards - - To compare cards, we get the binary representation. - """ - # print(history.history_str) - plays = len(history.history_str) - player = plays % 2 - opponent = 1 - player - - # Return payoff for terminal states - if (plays >= 1): - if history.history_str[-1] == 'f': # Fold, just calculate total value - all_history.append({"history": history.history_str, "player_cards": private_cards[0].as_list(), "opponent_cards": private_cards[1].as_list(), "community_cards": [str(x) for x in all_community_cards]}) - return history.total_pot_size - - elif history.game_stage == 6: - # Showdown - all_history.append({"history": history.history_str, "player_cards": private_cards[0].as_list(), "opponent_cards": private_cards[1].as_list(), "community_cards": [str(x) for x in all_community_cards]}) - hand = copy.deepcopy(CombinedHand()) - hand.add_combined_hands(community_cards, private_cards[player]) - - opponent_hand = copy.deepcopy(CombinedHand()) - opponent_hand.add_combined_hands(community_cards, private_cards[opponent]) - - evaluator = copy.deepcopy(Evaluator()) - evaluator.add_hands(hand, opponent_hand) - - assert(len(hand) == 7) - assert(len(opponent_hand) == 7) - assert(len(evaluator.hands) == 2) - - winners = evaluator.get_winner() - # print("Showdown time! Winner(s):", winners) - - assert(len(winners) > 0) # At least one winner - - if len(winners) == 2: # Tie - return history.total_pot_size / 2 - else: - if winners[0] == 0: - return history.total_pot_size - else: - return - history.total_pot_size - - if community_cards == None: - infoSet = private_cards[player].get_binary_representation() + history.history_str - else: - infoSet = private_cards[player].get_binary_representation() + community_cards.get_binary_representation() + history.history_str - # Get information set node or create it if nonexistant - if infoSet not in nodeMap: - node = Node() - node.infoSet = infoSet - nodeMap[infoSet] = node - else: - node = nodeMap[infoSet] - - # For each action, recursively call cfr with additional history and probability - strategy = node.getStrategy(p0 if player == 0 else p1) - util = np.zeros(NUM_ACTIONS) - nodeUtil = 0 - - for a in range(NUM_ACTIONS): - nextHistory = copy.deepcopy(history) - new_community_cards = copy.deepcopy(community_cards) - - nextHistory.curr_round_plays += 1 - - if a == 0: - nextHistory.history_str += "f" # fold - - elif a == 1: - nextHistory.history_str += "c" # Check/Call - nextHistory.total_pot_size += nextHistory.min_bet_size - if nextHistory.curr_round_plays > 1: # We move to to the next game_stage if there is more than one play - nextHistory.game_stage += 1 - nextHistory.curr_round_plays = 0 - nextHistory.min_bet_size = 0 # You don't have to bet anything - - if nextHistory.game_stage == 3: # Flop - new_community_cards = CombinedHand(all_community_cards[:3]) - assert(len(new_community_cards) == 3) - elif nextHistory.game_stage == 4: # Turn - new_community_cards.add_cards(all_community_cards[3]) - assert(len(new_community_cards) == 4) - elif nextHistory.game_stage == 5: # River - new_community_cards.add_cards(all_community_cards[4]) - assert(len(new_community_cards) == 5) - - else: - nextHistory.history_str += "r" # Bet/Raise - if (len(nextHistory.history_str) > 3) and nextHistory.history_str[-3:] == 'rrr': continue # To prevent infinite raises, we just don't consider this node - - # TODO: Change this, since this is not how limit hold'em works - if nextHistory.min_bet_size == 0: - nextHistory.min_bet_size = 1 - else: - nextHistory.min_bet_size *= 2 - - nextHistory.total_pot_size += nextHistory.min_bet_size - - - util[a] = - cfr(all_community_cards, private_cards, new_community_cards, nextHistory, p0 * strategy[a], p1) if player == 0 else - cfr(all_community_cards, private_cards, new_community_cards, nextHistory, p0, p1 * strategy[a]) - nodeUtil += strategy[a] * util[a] - - # For each action, compute and accumulate counterfactual regret - for a in range(NUM_ACTIONS): - regret = util[a] - nodeUtil - node.regretSum[a] += (p1 if player == 0 else p0) * regret - return nodeUtil + def __init__(self) -> None: + self.infoSet = "" + self.regretSum = np.zeros(NUM_ACTIONS) + self.strategy = np.zeros(NUM_ACTIONS) + self.strategySum = np.zeros(NUM_ACTIONS) + + def describe(self): + print( + "Infoset: {} -> Strategy at this infoset: {}, RegretSum: {}".format( + self.infoSet, np.around(self.getAverageStrategy(), 2), self.regretSum.sum() + ) + ) + + def getStrategy(self, realization_weight): + for a in range(NUM_ACTIONS): + self.strategy[a] = max(0, self.regretSum[a]) + + normalizingSum = self.strategy.sum() + for a in range(NUM_ACTIONS): + if normalizingSum > 0: + self.strategy[a] /= normalizingSum + else: + self.strategy[a] = 1 / NUM_ACTIONS + + self.strategySum[a] += realization_weight * self.strategy[a] + + return self.strategy + + def getAverageStrategy(self): + normalizingSum = self.strategySum.sum() + avgStrategy = np.zeros(NUM_ACTIONS) + for a in range(NUM_ACTIONS): + if normalizingSum > 0: + avgStrategy[a] = self.strategySum[a] / normalizingSum + else: + avgStrategy[a] = 1 / NUM_ACTIONS + + return avgStrategy + + +class History: + # SB bet size = 1, BB bet size = 2 + def __init__(self): + self.total_pot_size = 0 + self.history_str = "" + self.min_bet_size = 2 + self.game_stage = 2 + self.curr_round_plays = 0 # if self.curr_round_plays == 0 and we check, then we DON'T move to the next game stage + + +all_history = [] # Global variable to store all histories + + +def cfr( + all_community_cards: List[Card], + private_cards: List[CombinedHand], + community_cards: CombinedHand, + history: History, + p0, + p1, +): + """ + player_cards: [user_cards, opponent_cards] + community_cards: "" for community (board) cards + + To compare cards, we get the binary representation. + """ + # print(history.history_str) + plays = len(history.history_str) + player = plays % 2 + opponent = 1 - player + + # Return payoff for terminal states + if plays >= 1: + if history.history_str[-1] == "f": # Fold, just calculate total value + all_history.append( + { + "history": history.history_str, + "player_cards": private_cards[0].as_list(), + "opponent_cards": private_cards[1].as_list(), + "community_cards": [str(x) for x in all_community_cards], + } + ) + return history.total_pot_size + + elif history.game_stage == 6: + # Showdown + all_history.append( + { + "history": history.history_str, + "player_cards": private_cards[0].as_list(), + "opponent_cards": private_cards[1].as_list(), + "community_cards": [str(x) for x in all_community_cards], + } + ) + hand = copy.deepcopy(CombinedHand()) + hand.add_combined_hands(community_cards, private_cards[player]) + + opponent_hand = copy.deepcopy(CombinedHand()) + opponent_hand.add_combined_hands(community_cards, private_cards[opponent]) + + evaluator = copy.deepcopy(Evaluator()) + evaluator.add_hands(hand, opponent_hand) + + assert len(hand) == 7 + assert len(opponent_hand) == 7 + assert len(evaluator.hands) == 2 + + winners = evaluator.get_winner() + # print("Showdown time! Winner(s):", winners) + + assert len(winners) > 0 # At least one winner + if len(winners) == 2: # Tie + return history.total_pot_size / 2 + else: + if winners[0] == 0: + return history.total_pot_size + else: + return -history.total_pot_size + + if community_cards == None: + infoSet = private_cards[player].get_binary_representation() + history.history_str + else: + infoSet = ( + private_cards[player].get_binary_representation() + + community_cards.get_binary_representation() + + history.history_str + ) + # Get information set node or create it if nonexistant + if infoSet not in nodeMap: + node = Node() + node.infoSet = infoSet + nodeMap[infoSet] = node + else: + node = nodeMap[infoSet] + + # For each action, recursively call cfr with additional history and probability + strategy = node.getStrategy(p0 if player == 0 else p1) + util = np.zeros(NUM_ACTIONS) + nodeUtil = 0 + + for a in range(NUM_ACTIONS): + nextHistory = copy.deepcopy(history) + new_community_cards = copy.deepcopy(community_cards) + + nextHistory.curr_round_plays += 1 + + if a == 0: + nextHistory.history_str += "f" # fold + + elif a == 1: + nextHistory.history_str += "c" # Check/Call + nextHistory.total_pot_size += nextHistory.min_bet_size + if ( + nextHistory.curr_round_plays > 1 + ): # We move to to the next game_stage if there is more than one play + nextHistory.game_stage += 1 + nextHistory.curr_round_plays = 0 + nextHistory.min_bet_size = 0 # You don't have to bet anything + + if nextHistory.game_stage == 3: # Flop + new_community_cards = CombinedHand(all_community_cards[:3]) + assert len(new_community_cards) == 3 + elif nextHistory.game_stage == 4: # Turn + new_community_cards.add_cards(all_community_cards[3]) + assert len(new_community_cards) == 4 + elif nextHistory.game_stage == 5: # River + new_community_cards.add_cards(all_community_cards[4]) + assert len(new_community_cards) == 5 + + else: + nextHistory.history_str += "r" # Bet/Raise + if (len(nextHistory.history_str) > 3) and nextHistory.history_str[-3:] == "rrr": + continue # To prevent infinite raises, we just don't consider this node + + # TODO: Change this, since this is not how limit hold'em works + if nextHistory.min_bet_size == 0: + nextHistory.min_bet_size = 1 + else: + nextHistory.min_bet_size *= 2 + + nextHistory.total_pot_size += nextHistory.min_bet_size + + util[a] = ( + -cfr( + all_community_cards, + private_cards, + new_community_cards, + nextHistory, + p0 * strategy[a], + p1, + ) + if player == 0 + else -cfr( + all_community_cards, + private_cards, + new_community_cards, + nextHistory, + p0, + p1 * strategy[a], + ) + ) + nodeUtil += strategy[a] * util[a] + + # For each action, compute and accumulate counterfactual regret + for a in range(NUM_ACTIONS): + regret = util[a] - nodeUtil + node.regretSum[a] += (p1 if player == 0 else p0) * regret + return nodeUtil averageUtils = [] + + def train(iterations, save=True): - deck = Deck() - util = 0 - for i in tqdm(range(startIterations, iterations), desc="Training Loop"): - deck.reset_deck() - player_cards = CombinedHand([deck.draw(), deck.draw()]) - opponent_cards = CombinedHand([deck.draw(), deck.draw()]) - - all_community_cards = [] - for _ in range(5): - all_community_cards.append(deck.draw()) - - assert(deck.total_remaining_cards == 43) - - private_cards = [player_cards, opponent_cards] - community_cards = None - history = History() - util += cfr(all_community_cards,private_cards,community_cards,history,1,1) - if (i % 1 == 0): - print("Average game value: ", util/i) - averageUtils.append(util/i) - - if save and i % 100 == 0: - joblib.dump(nodeMap, "HoldemNodeMap.joblib") - joblib.dump(all_history, "HoldemTrainingHistory.joblib") - joblib.dump(averageUtils, "averageUtils.joblib") - + deck = Deck() + util = 0 + for i in tqdm(range(startIterations, iterations), desc="Training Loop"): + deck.reset_deck() + player_cards = CombinedHand([deck.draw(), deck.draw()]) + opponent_cards = CombinedHand([deck.draw(), deck.draw()]) + + all_community_cards = [] + for _ in range(5): + all_community_cards.append(deck.draw()) + + assert deck.total_remaining_cards == 43 + + private_cards = [player_cards, opponent_cards] + community_cards = None + history = History() + util += cfr(all_community_cards, private_cards, community_cards, history, 1, 1) + if i % 1 == 0: + print("Average game value: ", util / i) + averageUtils.append(util / i) + + if save and i % 100 == 0: + joblib.dump(nodeMap, "HoldemNodeMap.joblib") + joblib.dump(all_history, "HoldemTrainingHistory.joblib") + joblib.dump(averageUtils, "averageUtils.joblib") + + if __name__ == "__main__": - train_from_scratch = True # Set this to True if you want to retrain from scratch - parser = argparse.ArgumentParser(description="Train a Hold'Em AI.") - parser.add_argument("-s", "--save", - action="store_true", dest="save", default=True, - help="Save the trained model and history") - parser.add_argument("-l", "--load", - action="store_true", dest="load", default=False, - help="Load the trained model and history to resume training") - parser.add_argument("-v", "--visualize", - action="store_true", dest="visualize", default=False, - help="Print out all information sets with their corresponding strategy. Do NOT train") - - args = parser.parse_args() - save = args.save # Save history and information set - load = args.load - visualize = args.visualize - if load: - nodeMap = joblib.load("HoldemNodeMap.joblib") - history = joblib.load("HoldemTrainingHistory.joblib") - averageUtils = joblib.load("averageUtils.joblib") - - assert(len(nodeMap) > 0) - assert(len(history) > 0) - assert(startIterations > 0) - - if not visualize: - train(1000000, save) - - nodeMap = joblib.load("HoldemNodeMap.joblib") # Load information sets - print("Total Number of Infosets:", len(nodeMap)) - for infoset in nodeMap: - nodeMap[infoset].describe() \ No newline at end of file + train_from_scratch = True # Set this to True if you want to retrain from scratch + parser = argparse.ArgumentParser(description="Train a Hold'Em AI.") + parser.add_argument( + "-s", + "--save", + action="store_true", + dest="save", + default=True, + help="Save the trained model and history", + ) + parser.add_argument( + "-l", + "--load", + action="store_true", + dest="load", + default=False, + help="Load the trained model and history to resume training", + ) + parser.add_argument( + "-v", + "--visualize", + action="store_true", + dest="visualize", + default=False, + help="Print out all information sets with their corresponding strategy. Do NOT train", + ) + + args = parser.parse_args() + save = args.save # Save history and information set + load = args.load + visualize = args.visualize + if load: + nodeMap = joblib.load("HoldemNodeMap.joblib") + history = joblib.load("HoldemTrainingHistory.joblib") + averageUtils = joblib.load("averageUtils.joblib") + + assert len(nodeMap) > 0 + assert len(history) > 0 + assert startIterations > 0 + + if not visualize: + train(1000000, save) + + nodeMap = joblib.load("HoldemNodeMap.joblib") # Load information sets + print("Total Number of Infosets:", len(nodeMap)) + for infoset in nodeMap: + nodeMap[infoset].describe()