-
Notifications
You must be signed in to change notification settings - Fork 0
/
game.py
297 lines (241 loc) · 9.72 KB
/
game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import collections
from copy import copy
from enum import Enum
import functools
from types import MethodType
import itertools
import tornado.concurrent
Orientation = Enum('Orientation', 'horizontal vertical')
GameState = Enum('GameState', 'won lost canPlay wait')
ShotResult = Enum('ShotResult', 'hit miss sunk alreadyShot')
class Ship(object):
def __init__(self, name, size, fields_intact=None):
self.name = name
self.size = size
self.fields_intact = fields_intact or size
self.coords = []
def __repr__(self):
return 'Ship(%s, %s, %s)' % (self.name, self.size, self.fields_intact)
class GameProxy(object):
"""
Convenience proxy for accessing Game instances in the perspective of one player.
All methods in Game take the session token of the current player as parameter.
This object enables one to omit this parameter.
"""
def __init__(self, game, player_token):
"""
:param game: game to build the proxy for
:param player_token: the player to create the proxy for.
"""
self.__player_token = player_token
self.__game = game
def __getattr__(self, item):
attr = getattr(self.__game, item)
if isinstance(attr, MethodType):
return functools.partial(attr, self.__player_token)
else:
return attr
class Game(object):
""" Represents the game logic """
GRID_SIZE = 6
"""
Size of the grid/field
"""
AVAILABLE_SHIPS = [
(Ship('Battleship', 4), 1),
(Ship('Destroyer', 3), 1),
(Ship('Submarine', 2), 2),
]
"""
Initial list of number of ships available for each type as list of tuples (ship, count)
"""
def __init__(self, p1_token, p2_token):
"""
:param p1_token: unique token/identifier for player 1
:param p2_token: unique token/identifier for player 2
"""
self.p1_token = p1_token
self.p2_token = p2_token
p1_ships = Game.generate_ships_to_place()
p2_ships = Game.generate_ships_to_place()
self.whose_turn = self.p1_token
self.opponent = {
p1_token: p2_token,
p2_token: p1_token,
}
self.ships_to_place = {
p1_token: collections.deque(p1_ships),
p2_token: collections.deque(p2_ships),
}
self.own_ships = {
p1_token: p1_ships,
p2_token: p2_ships,
}
self.grids = {
p1_token: Grid(Game.GRID_SIZE),
p2_token: Grid(Game.GRID_SIZE),
}
self.moves_done = {
p1_token: [],
p2_token: [],
}
self.futures = {
p1_token: tornado.concurrent.Future(),
p2_token: tornado.concurrent.Future()
}
self.whose_turn = self.p1_token
def get_opponent(self, player_token):
return self.opponent[player_token]
def get_own_ship_coords(self, player_token):
own_ships = self.own_ships[player_token]
own_ships_coords = list(itertools.chain.from_iterable(ship.coords for ship in own_ships))
return own_ships_coords
def get_opponent_ship_coords(self, player_token):
opponent = self.get_opponent(player_token)
return self.get_own_ship_coords(opponent)
def get_game_state(self, player_token):
"""
:param player_token: unique player token
:return: GameState
"""
state = GameState.wait
if not sum((ship.fields_intact for ship in self.own_ships[player_token])):
state = GameState.lost
elif not sum((ship.fields_intact for ship in self.own_ships[self.opponent[player_token]])):
state = GameState.won
elif self.all_ships_placed() and self.is_players_turn(player_token):
state = GameState.canPlay
return state
def all_ships_placed(self):
return not self.ships_to_place[self.p1_token] and not self.ships_to_place[self.p2_token]
def is_players_turn(self, player_token):
"""
:param player_token: unique player token
:return: true if it is the players turn; false if it is not
"""
return self.whose_turn == player_token
def wait_for_turn(self, player_token):
return self.futures[player_token]
def get_ship_to_place(self, player_token):
"""
Returns instance of the ship that is to be placed next by the given player
:param player_token: unique player token
:return: instance of Ship, None if all ships have been placed
"""
if self.ships_to_place[player_token]:
return self.ships_to_place[player_token][0]
else:
return None
def place_ship(self, player_token, top_left_coord, orientation):
"""
Places the *current* ship (returned by get_ship_to_place)
:param player_token: unique player token
:param top_left_coord: top left Coord of the ship on the field
:param orientation: Orientation of the ship on the field, e.g. Orientation.vertical
:raises OccupiedFieldsException if fields are blocked by other ships
:raises IndexError if coord out of range
"""
ship_to_place = self.get_ship_to_place(player_token)
grid = self.grids[player_token]
if orientation is Orientation.horizontal:
coord_range_x = range(top_left_coord.x, top_left_coord.x + ship_to_place.size)
coords = [Coord(x, top_left_coord.y) for x in coord_range_x]
else:
coord_range_y = range(top_left_coord.y, top_left_coord.y + ship_to_place.size)
coords = [Coord(top_left_coord.x, y) for y in coord_range_y]
grid.put(ship_to_place, coords)
self.ships_to_place[player_token].popleft()
if self.all_ships_placed():
self.futures[self.whose_turn].set_result(None)
def shoot_field(self, player_token, opponent_field_coord):
"""
shoots a field on the opponent's given by opponent_field_coord.
:param player_token: player_token
:param opponent_field_coord: the coordinate to shoot on the opponent's field
:return: a ShotResult
"""
if self.get_game_state(player_token) != GameState.canPlay:
raise Exception('Not your turn')
opponent = self.opponent[player_token]
opponent_field = self.grids[opponent]
what_is_at_coord = opponent_field[opponent_field_coord]
shot_result = None
if what_is_at_coord is None:
shot_result = ShotResult.miss
opponent_field[opponent_field_coord] = Grid.FIELD_SHOT
elif what_is_at_coord is Grid.FIELD_SHOT:
shot_result = ShotResult.alreadyShot
else:
shot_result = ShotResult.hit
what_is_at_coord.fields_intact -= 1
opponent_field[opponent_field_coord] = Grid.FIELD_SHOT
if not what_is_at_coord.fields_intact:
shot_result = ShotResult.sunk
if shot_result != ShotResult.alreadyShot:
self.moves_done[player_token].append((opponent_field_coord, what_is_at_coord))
self.futures[self.whose_turn] = tornado.concurrent.Future()
self.whose_turn = opponent
self.futures[self.whose_turn].set_result(None)
return shot_result
def get_last_opponent_move(self, player_token):
opponent = self.opponent[player_token]
if self.moves_done[opponent]:
move_done = self.moves_done[opponent][-1]
return move_done
return None, None
@classmethod
def generate_ships_to_place(cls):
ships_to_place = collections.deque()
for (ship, number) in cls.AVAILABLE_SHIPS:
for _ in range(number):
# copying here, because we need to be able to detect that an /individual/ ship has sunk
ships_to_place.append(copy(ship))
return ships_to_place
class Coord(object):
def __init__(self, x, y):
"""
Accepts coordinates either as ('A', 1') or (*'A1') or (0,0) or with kwargs x and y
"""
if isinstance(x, str):
self.x = ord(x) - ord('A')
self.y = int(y) - 1
else:
self.x = x
self.y = y
def __str__(self):
x_str = chr(self.x + ord('A'))
y_str = str(self.y + 1)
return x_str + y_str
def __repr__(self):
return 'Coord(*%s)' % str(self)
class OccupiedFieldsException(Exception):
def __init__(self, message=None, occupied_fields=None):
"""
:param message: exception message
:param occupied_fields: list of Coord-objects denoting the fields that are occupied
"""
super(Exception, self).__init__(message)
self.occupied_fields = occupied_fields
class Grid(object):
FIELD_SHOT = "SHOT"
def __init__(self, grid_size):
self.field = [[None for _ in range(grid_size)] for _ in range(grid_size)]
def __getitem__(self, coord):
return self.field[coord.x][coord.y]
def __setitem__(self, coord, value):
self.field[coord.x][coord.y] = value
def put(self, ship, coords):
"""
:param ship: instance of Ship (note to properly clone objects, when placing two ships of the same type)
:param coords: list of Coords
:raises IndexError if ship is partially or completely out of grid
:raises OccupiedFieldsException if field is not empty
:return: None
"""
occupied_fields = [coord for coord in coords if self[coord] is not None]
if occupied_fields:
raise OccupiedFieldsException(occupied_fields=occupied_fields)
for coord in coords:
self[coord] = ship
ship.coords = list(coords)
print('Put %s at %s' % (str(ship), coords))