-
Notifications
You must be signed in to change notification settings - Fork 30
/
admm.py
115 lines (92 loc) · 3.66 KB
/
admm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
from numpy.linalg import inv
from numpy.linalg import norm
from joblib import Parallel, delayed
from multiprocessing import Process, Manager, cpu_count, Pool
class SolveIndividual:
def solve(self, A, b, nu, rho, Z):
t1 = A.dot(A.T)
A = A.reshape(-1, 1)
tX = (A * b + rho * Z - nu) / (t1 + rho)
return tX
class CombineSolution:
def combine(self, nuBar, xBar, Z, rho):
t = nuBar.reshape(-1, 1)
t = t + rho * (xBar.reshape(-1, 1) - Z)
return t.T
class ADMM:
def __init__(self, A, b, parallel = False):
self.D = A.shape[1]
self.N = A.shape[0]
if parallel:
self.XBar = np.zeros((self.N, self.D))
self.nuBar = np.zeros((self.N, self.D))
self.nu = np.zeros((self.D, 1))
self.rho = 1
self.X = np.random.randn(self.D, 1)
self.Z = np.zeros((self.D, 1))
self.A = A
self.b = b
self.alpha = 0.01
self.parallel = parallel
self.numberOfThreads = cpu_count()
def step(self):
if self.parallel:
return self.step_parallel()
# Solve for X_t+1
self.X = inv(self.A.T.dot(self.A) + self.rho).dot(self.A.T.dot(self.b) + self.rho * self.Z - self.nu)
# Solve for Z_t+1
self.Z = self.X + self.nu / self.rho - (self.alpha / self.rho) * np.sign(self.Z)
# Combine
self.nu = self.nu + self.rho * (self.X - self.Z)
def solveIndividual(self, i):
solve = SolveIndividual()
return solve.solve(self.A[i], np.asscalar(self.b[i]), self.nuBar[i].reshape(-1, 1), self.rho, self.Z)
def combineSolution(self, i):
combine = CombineSolution()
return combine.combine(self.nuBar[i].reshape(-1, 1), self.XBar[i].reshape(-1, 1), self.Z, self.rho)
def step_parallel(self):
# Solve for X_t+1
#Parallel(n_jobs = self.numberOfThreads, backend = "threading")(
# delayed(self.solveIndividual)(i) for i in range(0, self.N-1))
process = []
for i in range(0, self.N-1):
p = Process(target = self.solveIndividual, args= (i,))
p.start()
process.append(p)
for p in process:
p.join()
self.X = np.average(self.XBar, axis = 0)
self.nu = np.average(self.nuBar, axis = 0)
self.X = self.X.reshape(-1, 1)
self.nu = self.nu.reshape(-1, 1)
# Solve for Z_t+1
self.Z = self.X + self.nu / self.rho - (self.alpha / self.rho) * np.sign(self.Z)
# Combine
#Parallel(n_jobs = self.numberOfThreads, backend = "threading")(
# delayed(self.combineSolution)(i) for i in range(0, self.N-1))
process = []
for i in range(0, self.N-1):
p = Process(target = self.combineSolution, args= (i,))
p.start()
process.append(p)
for p in process:
p.join()
def step_iterative(self):
# Solve for X_t+1
for i in range(0, self.N-1):
t = self.solveIndividual(i)
self.XBar[i] = t.T
self.X = np.average(self.XBar, axis = 0)
self.nu = np.average(self.nuBar, axis = 0)
self.X = self.X.reshape(-1, 1)
self.nu = self.nu.reshape(-1, 1)
# Solve for Z_t+1
self.Z = self.X + self.nu / self.rho - (self.alpha / self.rho) * np.sign(self.Z)
# Combine
for i in range(0, self.N-1):
t = self.nuBar[i].reshape(-1, 1)
t = t + self.rho * (self.XBar[i].reshape(-1, 1) - self.Z)
self.nuBar[i] = t.T
def LassoObjective(self):
return 0.5 * norm(self.A.dot(self.X) - self.b)**2 + self.alpha * norm(self.X, 1)