-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathspike_trainer_2.py
214 lines (165 loc) · 7 KB
/
spike_trainer_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# Imports
import os
os.environ["THEANO_FLAGS"] = "mode=FAST_RUN,device=gpu,floatX=float32"
import numpy
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM
from keras.callbacks import ModelCheckpoint
from keras.utils import np_utils
from keras.models import load_model
import ipdb
import sys
class spike_trainer():
""" create a spike training object. """
def __init__(self,input_signal,output_spikes,exp_name='default_exp',load_weights=True,load_model=False,nb_epoch=10,batch_size=128,seq_length = 100):
self.input = input_signal
self.output = output_spikes
self.data_dim = input_signal.shape[1]
print "Number of Inputs %s" % self.data_dim
self.nb_classes = output_spikes.shape[1]
print "Number of Outputs %s" % self.nb_classes
self.model = Sequential()
self.nb_epoch=nb_epoch
self.load_weights = load_weights
self.load_model = load_model
self.exp_name = exp_name
self.model_filename = self.exp_name + "_model.h5"
self.weights_filename = self.exp_name + "_weights.h5"
self.batch_size=batch_size
self.seq_length = 100
filepath=self.exp_name + "-{epoch:02d}-{loss:.4f}.hdf5"
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
self.callbacks_list = [checkpoint]
# Assign X and Y
self.X = []
self.y = []
self._process_data()
self._create_network()
def _save_weights(self):
print "Saving weights..."
self.model.save_weights(self.weights_filename)
print "Saved to %s" % self.weights_filename
def _load_weights(self):
print "Loading weights..."
self.model.load_weights(self.weights_filename)
print "Loaded from %s" % self.weights_filename
def _process_data(self):
# Get number of output states (chars)(0/1)
output_classes = 1#len(set(self.output))
# summarize the loaded data
n_chars = self.output.shape[0]
# prepare the dataset of input to output pairs encoded as integers
self.seq_length = 100
dataX = []
dataY = []
X = numpy.zeros((n_chars-self.seq_length,self.seq_length,self.input.shape[1]))
y = numpy.zeros((n_chars-self.seq_length,output_classes))
# for every char in the input string
for i in range(0, n_chars - self.seq_length, 1):
# create the input sequence
seq_in = self.input[i:i + self.seq_length]
# create the output result
seq_out = self.output[i + self.seq_length]
# append the float version of the input vecot to dataX
dataX.append(seq_in)
X[i,:,:] = seq_in
y[i,:] = seq_out
# append the float version of the output to dataY
dataY.append(seq_out[0])
# Calculate the number of patterns
self.n_patterns = len(dataX)
self.y = numpy.reshape(y,(y.shape[0],y.shape[1]))#dataY
self.X = X
def _create_network(self):
def create_new_network():
timesteps = self.seq_length
# expected input data shape: (batch_size, timesteps, data_dim)
model = Sequential()
#model.add(Dense(100, input_shape=(timesteps, self.data_dim))) # returns a sequence of vectors
#model.add(LSTM(256, return_sequences=True)) # returns a sequence of vectors of dimension 32
model.add(LSTM(256, return_sequences=True,
input_shape=(timesteps, self.data_dim))) # returns a sequence of vectors of dimension 32
model.add(Dropout(0.2))
model.add(LSTM(128, return_sequences=True)) # returns a sequence of vectors of dimension 32
model.add(Dropout(0.2))
model.add(LSTM(64)) # return a single vector of dimension 32
model.add(Dropout(0.2))
model.add(Dense(self.nb_classes, activation='relu'))
#model.compile(loss='categorical_crossentropy',
#model.compile(loss='binary_crossentropy',
model.compile(loss='mse',
optimizer='adam')
# define the checkpoint
return model
if self.load_model:
try:
print "Loading Model"
self.model = load_model(self.model_filename)
print "Loaded from %s" % self.model_filename
except:
print "Failed to load model"
ipdb.set_trace()
self.model = create_new_network()
self.model.save(self.model_filename)
print "Created new model"
else:
self.model = create_new_network()
print "Created new model"
self.model.save(self.model_filename)
if self.load_weights:
try:
self._load_weights()
except:
print "Failed to load weights"
def fit_model(self):
self.model.fit(self.X, self.y, nb_epoch=self.nb_epoch, batch_size=self.batch_size, callbacks=self.callbacks_list)
self._save_weights()
def generate(self):
""" from a random input, generate stereotypical output """
# pick a random seed
#pdb.set_trace()
start = numpy.random.randint(0, self.X.shape[0]-1)
pattern = self.X[start,:,:]
pattern = numpy.reshape(self.X[start],(1,self.X.shape[1],self.X.shape[2]))
output = []
# generate characters
for i in range(100):
x = pattern
prediction = self.model.predict(x, verbose=0)
index = numpy.argmax(prediction)
sys.stdout.write(str(index))
output.append(index)
pattern = numpy.reshape(self.X[start],(1,self.X.shape[1],self.X.shape[2]))
pattern[0,self.seq_length-1,0] = index
print "\nGeneration Complete."
return output
def compute(self,input_signal,seq_length=100):
""" from a set of input input, generate predicted output """
# pick a random seed
#pdb.set_trace()
print "\nComputing Model"
output = []
# generate characters
for i in range(len(input_signal)-seq_length):
x = input_signal[i:seq_length+i]
x = numpy.reshape(x,(1,self.X.shape[1],self.X.shape[2]))
prediction = self.model.predict(x, verbose=0)
#sys.stdout.write(str(index))
output.append(prediction[0])
print "\nCompute Complete."
return output
def main():
# Create Input
input_len = 10000
input_signal = numpy.zeros((input_len,1))
input_signal[::3,0] = 1
# Create Output
output_signal = numpy.zeros((input_len,1))
output_signal[1::3,0] = 1
st = spike_trainer(input_signal,output_signal,nb_epoch=3)
st.fit_model()
st.generate()
if __name__ == "__main__":
main()