-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkernelSearch.py
156 lines (146 loc) · 7.96 KB
/
kernelSearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import gpytorch as gpt
import torch
from GaussianProcess import ExactGPModel
from helpFunctions import get_string_representation_of_kernel as gsr, clean_kernel_expression
from helpFunctions import amount_of_base_kernels
import threading
import copy
from globalParams import options
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------HELP FUNCTIONS----------------------------------------------
# ----------------------------------------------------------------------------------------------------
def replace_internal_kernels(base, kernels):
ret = []
if not (hasattr(base, "kernels") or hasattr(base, "base_kernel")):
if None not in kernels:
ret = kernels
else:
return [k for k in kernels if k is not None]
elif hasattr(base, "kernels"): # operators
for position in range(len(base.kernels)):
for k in replace_internal_kernels(base.kernels[position], kernels):
new_expression = copy.deepcopy(base)
new_expression.kernels[position] = k
ret.append(new_expression)
if None in kernels:
if len(base.kernels) > 2:
new_expression = copy.deepcopy(base)
new_expression.kernels.pop(position)
ret.append(new_expression)
else:
ret.append(base.kernels[position])
elif hasattr(base, "base_kernel"): # scaleKernel
for k in replace_internal_kernels(base.base_kernel, kernels):
new_expression = copy.deepcopy(base)
new_expression.base_kernel = k
ret.append(new_expression)
for expression in ret:
clean_kernel_expression(expression)
return ret
def extend_internal_kernels(base, kernels, operations):
ret = []
for op in operations:
ret.extend([op(base, k) for k in kernels])
if hasattr(base, "kernels"):
for position in range(len(base.kernels)):
for k in extend_internal_kernels(base.kernels[position], kernels, operations):
new_expression = copy.deepcopy(base)
new_expression.kernels[position] = k
ret.append(new_expression)
elif hasattr(base, "base_kernel"):
for k in extend_internal_kernels(base.base_kernel, kernels, operations):
new_expression = copy.deepcopy(base)
new_expression.base_kernel = k
ret.append(new_expression)
return ret
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------CANDIDATE FUNCTIONS-----------------------------------------
# ----------------------------------------------------------------------------------------------------
def create_candidates_CKS(base, kernels, operations):
ret = []
ret.extend(list(set(extend_internal_kernels(base, kernels, operations))))
ret.extend(list(set(replace_internal_kernels(base, kernels))))
return ret
def create_candidates_AKS(base, kernels, operations, max_complexity=5):
ret = []
if max_complexity and amount_of_base_kernels(base) < max_complexity:
ret.extend(extend_internal_kernels(base, kernels, operations))
ret.extend((replace_internal_kernels(base, [None] + kernels)))
ret.append(base)
ret = list(set(ret))
return ret
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------EVALUATE FUNCTIONS------------------------------------------
# ----------------------------------------------------------------------------------------------------
def evaluate_performance_via_likelihood(model):
return - model.get_current_loss()
# ----------------------------------------------------------------------------------------------------
# ------------------------------------------- CKS ----------------------------------------------------
# ----------------------------------------------------------------------------------------------------
def CKS(training_data, likelihood, base_kernels, iterations, **kwargs):
operations = [gpt.kernels.AdditiveKernel, gpt.kernels.ProductKernel]
candidates = base_kernels.copy()
best_performance = dict()
models = dict()
performance = dict()
threads = list()
for i in range(iterations):
for k in candidates:
models[gsr(k)] = ExactGPModel(training_data, copy.deepcopy(likelihood), copy.deepcopy(k))
if options["kernel search"]["multithreading"]:
threads.append(threading.Thread(target=models[gsr(k)].optimize_hyperparameters))
threads[-1].start()
else:
models[gsr(k)].optimize_hyperparameters()
for t in threads:
t.join()
for k in candidates:
performance[gsr(k)] = evaluate_performance_via_likelihood(models[gsr(k)])
if options["kernel search"]["print"]:
print(f"KERNEL SEARCH: iteration {i} checking {gsr(k)}, loss {-performance[gsr(k)]}")
if len(best_performance) > 0:
if best_performance["performance"] >= max(performance.values()):
if options["kernel search"]["print"]:
print("KERNEL SEARCH: no gain through additional kernel length, stopping search")
break
best_model = models[max(performance, key=performance.__getitem__)]
best_performance = {"model": best_model, "performance": max(performance.values())}
candidates = create_candidates_CKS(best_model.covar_module, base_kernels, operations)
if options["kernel search"]["print"]:
print(f"KERNEL SEARCH: kernel search concluded, optimal expression: {gsr(best_model.covar_module)}")
return best_model.covar_module, best_model.likelihood
# ----------------------------------------------------------------------------------------------------
# ------------------------------------------- AKS ----------------------------------------------------
# ----------------------------------------------------------------------------------------------------
def AKS(base_expression, training_data, likelihood, base_kernels, iterations, max_complexity = 99):
operations = [gpt.kernels.AdditiveKernel, gpt.kernels.ProductKernel]
candidates = create_candidates_AKS(base_expression, base_kernels, operations, max_complexity)
best_performance = dict()
models = dict()
performance = dict()
threads = list()
for i in range(iterations):
for k in candidates:
models[gsr(k)] = ExactGPModel(training_data, copy.deepcopy(likelihood), copy.deepcopy(k))
if options["kernel search"]["multithreading"]:
threads.append(threading.Thread(target=models[gsr(k)].optimize_hyperparameters))
threads[-1].start()
else:
models[gsr(k)].optimize_hyperparameters()
for t in threads:
t.join()
for k in candidates:
performance[gsr(k)] = evaluate_performance_via_likelihood(models[gsr(k)])
if options["kernel search"]["print"]:
print(f"KERNEL SEARCH: iteration {i} checking {gsr(k)}, loss {performance[gsr(k)]}")
if len(best_performance) > 0:
if best_performance["performance"] >= max(performance.values()):
if options["kernel search"]["print"]:
print("KERNEL SEARCH: no gain through additional kernel length, stopping search")
break
best_model = models[max(performance, key=performance.__getitem__)]
best_performance = {"model": best_model, "performance": max(performance.values())}
candidates = create_candidates_AKS(best_model.covar_module, base_kernels, operations, max_complexity)
if options["kernel search"]["print"]:
print(f"KERNEL SEARCH: kernel search concluded, optimal expression: {gsr(best_model.covar_module)}")
return best_model.covar_module, best_model.likelihood