-
Notifications
You must be signed in to change notification settings - Fork 30
/
Qlearning.py
155 lines (133 loc) · 5.57 KB
/
Qlearning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#! /usr/bin/env python
import numpy as np
from math import *
from std_msgs.msg import String
from itertools import product
from sensor_msgs.msg import LaserScan
STATE_SPACE_IND_MAX = 144 - 1
STATE_SPACE_IND_MIN = 1 - 1
ACTIONS_IND_MAX = 2
ACTIONS_IND_MIN = 0
ANGLE_MAX = 360 - 1
ANGLE_MIN = 1 - 1
HORIZON_WIDTH = 75
T_MIN = 0.001
# Create actions
def createActions():
actions = np.array([0,1,2])
return actions
# Create state space for Q table
def createStateSpace():
x1 = set((0,1,2))
x2 = set((0,1,2))
x3 = set((0,1,2,3))
x4 = set((0,1,2,3))
state_space = set(product(x1,x2,x3,x4))
return np.array(list(state_space))
# Create Q table, dim: n_states x n_actions
def createQTable(n_states, n_actions):
#Q_table = np.random.uniform(low = -0.05, high = 0, size = (n_states,n_actions) )
Q_table = np.zeros((n_states, n_actions))
return Q_table
# Read Q table from path
def readQTable(path):
Q_table = np.genfromtxt(path, delimiter = ' , ')
return Q_table
# Write Q table to path
def saveQTable(path, Q_table):
np.savetxt(path, Q_table, delimiter = ' , ')
# Select the best action a in state
def getBestAction(Q_table, state_ind, actions):
if STATE_SPACE_IND_MIN <= state_ind <= STATE_SPACE_IND_MAX:
status = 'getBestAction => OK'
a_ind = np.argmax(Q_table[state_ind,:])
a = actions[a_ind]
else:
status = 'getBestAction => INVALID STATE INDEX'
a = getRandomAction(actions)
return ( a, status )
# Select random action from actions
def getRandomAction(actions):
n_actions = len(actions)
a_ind = np.random.randint(n_actions)
return actions[a_ind]
# Epsilog Greedy Exploration action chose
def epsiloGreedyExploration(Q_table, state_ind, actions, epsilon):
if np.random.uniform() > epsilon and STATE_SPACE_IND_MIN <= state_ind <= STATE_SPACE_IND_MAX:
status = 'epsiloGreedyExploration => OK'
( a, status_gba ) = getBestAction(Q_table, state_ind, actions)
if status_gba == 'getBestAction => INVALID STATE INDEX':
status = 'epsiloGreedyExploration => INVALID STATE INDEX'
else:
status = 'epsiloGreedyExploration => OK'
a = getRandomAction(actions)
return ( a, status )
# SoftMax Selection
def softMaxSelection(Q_table, state_ind, actions, T):
if STATE_SPACE_IND_MIN <= state_ind <= STATE_SPACE_IND_MAX:
status = 'softMaxSelection => OK'
n_actions = len(actions)
P = np.zeros(n_actions)
# Boltzman distribution
P = np.exp(Q_table[state_ind,:] / T) / np.sum(np.exp(Q_table[state_ind,:] / T))
if T < T_MIN or np.any(np.isnan(P)):
( a, status_gba ) = getBestAction(Q_table, state_ind, actions)
if status_gba == 'getBestAction => INVALID STATE INDEX':
status = 'softMaxSelection => INVALID STATE INDEX'
else:
rnd = np.random.uniform()
status = 'softMaxSelection => OK'
if P[0] > rnd:
a = 0
elif P[0] <= rnd and (P[0] + P[1]) > rnd:
a = 1
elif (P[0] + P[1]) <= rnd:
a = 2
else:
status = 'softMaxSelection => Boltzman distribution error => getBestAction '
status = status + '\r\nP = (%f , %f , %f) , rnd = %f' % (P[0],P[1],P[2],rnd)
status = status + '\r\nQ(%d,:) = ( %f, %f, %f) ' % (state_ind,Q_table[state_ind,0],Q_table[state_ind,1],Q_table[state_ind,2])
( a, status_gba ) = getBestAction(Q_table, state_ind, actions)
if status_gba == 'getBestAction => INVALID STATE INDEX':
status = 'softMaxSelection => INVALID STATE INDEX'
else:
status = 'softMaxSelection => INVALID STATE INDEX'
a = getRandomAction(actions)
return ( a, status )
# Reward function for Q-learning - table
def getReward(action, prev_action, lidar, prev_lidar, crash):
if crash:
terminal_state = True
reward = -100
else:
lidar_horizon = np.concatenate((lidar[(ANGLE_MIN + HORIZON_WIDTH):(ANGLE_MIN):-1],lidar[(ANGLE_MAX):(ANGLE_MAX - HORIZON_WIDTH):-1]))
prev_lidar_horizon = np.concatenate((prev_lidar[(ANGLE_MIN + HORIZON_WIDTH):(ANGLE_MIN):-1],prev_lidar[(ANGLE_MAX):(ANGLE_MAX - HORIZON_WIDTH):-1]))
terminal_state = False
# Reward from action taken = fowrad -> +0.2 , turn -> -0.1
if action == 0:
r_action = +0.2
else:
r_action = -0.1
# Reward from crash distance to obstacle change
W = np.linspace(0.9, 1.1, len(lidar_horizon) // 2)
W = np.append(W, np.linspace(1.1, 0.9, len(lidar_horizon) // 2))
if np.sum( W * ( lidar_horizon - prev_lidar_horizon) ) >= 0:
r_obstacle = +0.2
else:
r_obstacle = -0.2
# Reward from turn left/right change
if ( prev_action == 1 and action == 2 ) or ( prev_action == 2 and action == 1 ):
r_change = -0.8
else:
r_change = 0.0
# Cumulative reward
reward = r_action + r_obstacle + r_change
return ( reward, terminal_state )
# Update Q-table values
def updateQTable(Q_table, state_ind, action, reward, next_state_ind, alpha, gamma):
if STATE_SPACE_IND_MIN <= state_ind <= STATE_SPACE_IND_MAX and STATE_SPACE_IND_MIN <= next_state_ind <= STATE_SPACE_IND_MAX:
status = 'updateQTable => OK'
Q_table[state_ind,action] = ( 1 - alpha ) * Q_table[state_ind,action] + alpha * ( reward + gamma * max(Q_table[next_state_ind,:]) )
else:
status = 'updateQTable => INVALID STATE INDEX'
return ( Q_table, status )