-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexperiment.py
241 lines (194 loc) · 10.2 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np
from data_fns import load_data
from plot_fns import label_plot, plot_losses, plot_embeddings
from networks import EmbeddingEncoder, PairwiseEmbeddingEncoder, TripletEmbeddingEncoder, VAE, PairwiseVAE, TripletVAE
from utils import PairGenerator, TripletGenerator
from losses import ContrastiveLoss, TripletLoss, PairwiseVAELoss, TripletVAELoss, reconstruction_error
from training import train_network
import torch
import torch.optim as optim
def run_knn(trainX, trainY, testX, testY, labels=['dog', 'cat', 'snake', 'lizard'], experiment_name=""):
print("========= KNN Results: %s ==========" % experiment_name)
knn = KNeighborsClassifier(n_neighbors=5, n_jobs=-1)
knn.fit(trainX, trainY)
train_pred = knn.predict(trainX)
test_pred = knn.predict(testX)
train_acc = knn.score(trainX, trainY)
test_acc = knn.score(testX, testY)
train_conf_mat = confusion_matrix(trainY, train_pred, labels=labels)
test_conf_mat = confusion_matrix(testY, test_pred, labels=labels)
print("Training Accuracy: ", train_acc)
print("Training Confusion Matrix: \n ", train_conf_mat)
print("Test Accuracy: ", test_acc)
print("Test Confusion Matrix: \n ", test_conf_mat)
print('====================================================')
return {'train_acc': train_acc, 'test_acc': test_acc,
'train_conf_mat': train_conf_mat, 'test_conf_mat': test_conf_mat}
def _experiment(network, latent_dim, datagen, metric_loss, reconstruction_loss,
trainX, trainY, testX, testY, keys, mu,
dataset, trainLabels, testLabels,
num_iters=20000, batch_size=64, device='cpu'):
"""Overall loop that runs one Embenc experiment. Parameters are as follows:
Args:
network ([type]): network being evaluated.
datagen ([type]): data generator object (either PairGenerator or TripletGenerator)
metric_loss ([type]): metric loss to be applied to the latent representations.
reconstruction_loss ([type]): reconstruction loss to be applied.
trainX ([type]): training data
trainY ([type]): training labels
testX ([type]): testing data
testY ([type]): test labels
keys ([type]): mapping between numeric categories in trainY/testY to label names.
mu ([type]): balancing parameter between reconstruction and metric losses.
dataset ([type]): name of the dataset
trainLabels ([type]): labels associated with trainY, in case composite labels were used for training
testLabels ([type]): labels associated with testY, in case composite labels were used for training
Returns:
dict:
model: trained model.
latent_info: dictionary with keys [train_data, train_label, test_data, test_label, plot_df]
latent_results: dictionary with output of run_knn on latents. keys are [train_acc, test_acc,
train_conf_mat, test_conf_mat]
"""
# default optimizer: Adam.
optimizer = optim.Adam(network.parameters(), lr=0.0001)
# train model
model, total_loss, recon_loss, metric_loss = train_network(network, datagen, num_iters, metric_loss=metric_loss,
reconstruction_loss=reconstruction_loss,
optimizer=optimizer, mu=mu, contractive=False,
device=device)
# plot losses
plot_losses(total_loss, metric_loss, recon_loss, num_iters)
# get predictions
model.eval()
model = model.cpu()
with torch.no_grad():
latent, reconstructed = model.get_full_pass(testX)
latent_train, reconstructed_train = model.get_full_pass(trainX)
# create dictionary for latent data.
latent = latent.cpu().numpy()
latent_train = latent_train.cpu().numpy()
testLabels = testLabels.cpu().numpy()
train_labels = trainLabels.cpu().numpy()
latent_info = {"dim": latent_dim,
"mu": mu,
"dataset": dataset,
"train_data": latent_train,
"train_labels": train_labels,
"test_data": latent,
"test_labels": testLabels,
"keys": keys}
if latent_dim == 2:
print('latent = 2, plotting directly.')
latent_info['plot_data'] = latent
else:
print("latent dim > 2, fitting tsne...")
tsne = TSNE(n_components=2, n_iter=3000, n_jobs=-1)
print(latent.shape)
tsne_projection = tsne.fit_transform(latent)
latent_info['plot_data'] = tsne_projection
# plot reconstruction if mnist.
if dataset == 'mnist':
fig = plt.figure()
ax1 = fig.add_subplot(1, 2, 1)
ax1.imshow(testX[0].cpu().numpy().reshape(28, 28))
ax1.set_title("Original Image")
ax2 = fig.add_subplot(1, 2, 2)
np_arr = reconstructed.cpu().numpy()[0]
ax2.imshow(np_arr.reshape(28, 28))
ax2.set_title("reconstruction")
plt.show()
# plot the embedding
plot_embeddings(latent_info)
# knn on the embedding.
if torch.is_tensor(trainY):
if trainY.is_cuda:
trainY = trainY.cpu()
trainY = trainY.numpy()
latent_results = run_knn(latent_train, trainY, latent, testY, labels=list(set(testY.flatten())),
experiment_name=dataset + ' mu=' + str(mu) + ' latent=' + str(latent_dim))
if dataset == 'mnist':
label_plot(testX.cpu().numpy(), latent_info['plot_data'], testY.cpu().numpy(), 200, mu)
results = {"model": model,
"latent_info": latent_info,
"latent_results": latent_results}
return results
def initialize_network(sim_type, network_type, input_dim, latent_dim, dropout_val=0, device=None):
"""Helper method that grabs the appropriate network type given the parameters.
Args:
sim_type ([string]): "pairwise" or "triplet"
network_type ([string]): "vae" or "embenc"
input_dim ([int]): input dimension
latent_dim ([int]): latent dimension
dropout_val ([float]): dropout. defaults to 0.
"""
print("sim type: ", sim_type)
print("network_type: ", network_type)
if network_type == 'vae':
base_vae = VAE(input_dim, latent_dim, dropout=dropout_val, device=device)
if sim_type == 'pairwise':
network = PairwiseVAE(base_vae)
elif sim_type == 'triplet':
network == TripletVAE(base_vae)
else:
base_network = EmbeddingEncoder(input_dim=input_dim,
latent_dim=latent_dim,
dropout_val=dropout_val)
if sim_type == 'pairwise':
network = PairwiseEmbeddingEncoder(base_network)
if sim_type == 'triplet':
network = TripletEmbeddingEncoder(base_network)
return network
def run_experiment(sim_type, network_type, dataset, mus, latent_dims,
batch_size=64, keys=None, composite_labels=None,
margin=1, dropout_val=0.2, num_iters=20000,
normalize=True, feature_extractor='hog'):
"""Overarching function that runs an experiment. Calls _experiment for each
combination of mu and latent dim.
Args:
sim_type (str): Simulation type. Accepts either "pairwise" or "triplet"
network_type (str): Network type. Accepts either "embenc" or "vae"
dataset (str): Dataset name. Accepts "mnist", "cifar10" and "imagenet"
mus (list): list of mu values for which experiment is to be run.
latent_dims (list): list of latent dimensions for which experiment is to be run.
batch_size (int, optional): batch size from which pairs/triples are generated. Defaults to 64.
keys (dict, optional): Mapping from numeric labels to string label names. Defaults to None.
composite_labels (dict, optional): Maps current labels to composite labels. Defaults to None.
margin (int, optional): Margin to be used for metric loss. Defaults to 1.
dropout_val (float, optional): dropout value for networks. Defaults to 0.2.
num_iters (int, optional): number of iterations. Defaults to 20000.
Returns:
dict: mapping from (mu, latent_dim) to results for _experiment on those params.
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("loading data...", end='')
trainX, trainY, testX, testY, trainLabels, testLabels = load_data(dataset, composite_labels=composite_labels,
normalize=normalize,
feature_extractor=feature_extractor)
print("done.")
# instantiate the generator and the networks based on type.
print("initializing data generators and losses...", end='')
input_dim = trainX.shape[-1]
if sim_type == 'pairwise':
datagen = PairGenerator(trainX, trainY, batch_size)
metric_loss = ContrastiveLoss(margin) if network_type != 'vae' else PairwiseVAELoss(margin)
elif sim_type == 'triplet':
datagen = TripletGenerator(trainX, trainY, batch_size)
metric_loss = TripletLoss(margin) if network_type != 'vae' else TripletVAELoss(margin)
reconstruction_loss = reconstruction_error
print("done.")
all_results = {}
for mu in mus:
for latent_dim in latent_dims:
network = initialize_network(sim_type, network_type, input_dim, latent_dim, dropout_val, device=device)
results = _experiment(network, latent_dim, datagen, metric_loss, reconstruction_loss,
trainX, trainY, testX, testY, keys, mu,
dataset, trainLabels, testLabels,
num_iters=num_iters, batch_size=batch_size,
device=device)
all_results[(mu, latent_dim)] = results
return all_results