-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrandomized_value_iteration.py
71 lines (54 loc) · 2.03 KB
/
randomized_value_iteration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import numpy as np
import time
def apx_trans(mdp, u, M, i, a, eps, delta):
""" Approximate Transition
- give an eps-approxiamtion of p_a(i)^T v_i
"""
assert(np.max(u) <= M)
m = int(2 * M**2 / (eps**2) * np.log(2 / delta)) + 1
result = 0
for k in range(m):
j, _ = mdp.step(i, a)
result += u[j, 0]
return result / m
def exacte_trans(mdp, u, M, i, a, eps, delta):
return mdp.transition[i, a, :].dot(u[:, 0])
def apx_val(mdp, u, v0, x, eps, delta):
""" Approximate Value Operator
- Compute policy pi and value function v using value iteration method
with approximated transition p_a(i).v_i
"""
M = np.max(np.abs(u - v0))
delta2 = delta / (mdp.nb_s * mdp.nb_a)
v = np.zeros((mdp.nb_s, 1))
pi = np.zeros((mdp.nb_s, 1))
for i in range(mdp.nb_s):
Q = np.zeros((mdp.nb_a, 1))
for a in range(mdp.nb_a):
Q[a, 0] = mdp.gamma * \
(x[i, a] + apx_trans(mdp, u - v0, M, i, a, eps, delta2))
Q[a, 0] += mdp.rewards[i, a]
v[i, 0] = np.max(Q[:, 0])
pi[i, 0] = np.argmax(Q[:, 0])
return v, pi
def randomizedVI(mdp, v0, L, eps, delta, analyze=False):
m_hist = []
start_time_x = time.time()
x = np.zeros((mdp.nb_s, mdp.nb_a))
for i in range(mdp.nb_s):
x[i, :] = [mdp.transition[i, a, :].dot(v0) for a in range(mdp.nb_a)]
if analyze:
print("{} sec to compute x=p^Tv".format(round(time.time() - start_time_x,4)))
v_prev = v0.copy()
for l in range(L):
start_time_l = time.time()
v_l, pi_l = apx_val(mdp, v_prev, v0, x, eps, delta / L)
v_prev = v_l
if analyze:
duration_l = time.time() - start_time_l
m = int(2 * np.max(np.abs(v_prev - v0))**2 / (eps**2) \
* np.log(2 / delta)) + 1
print("Iteration l={}, |S||A|*{} iterations of ApxTrans in {} sec".format(
l, m, round(duration_l,4)))
m_hist.append([m, duration_l])
return v_l, pi_l, np.arrray(m_hist)