-
Notifications
You must be signed in to change notification settings - Fork 4
/
train.py
201 lines (159 loc) · 8.31 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3
import os
import cv2
import numpy as np
import random
import argparse
import tensorflow as tf
import tensorflow.keras as keras
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Build a Keras model given some parameters
def create_model(captcha_length, captcha_num_symbols, input_shape, model_depth=5, module_size=2):
input_tensor = keras.Input(input_shape)
x = input_tensor
for i, module_length in enumerate([module_size] * model_depth):
for j in range(module_length):
x = keras.layers.Conv2D(32 * 2 ** min(i, 3), kernel_size=3, padding='same',
kernel_initializer='he_uniform')(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Activation('relu')(x)
x = keras.layers.MaxPooling2D(2)(x)
x = keras.layers.Flatten()(x)
x = [keras.layers.Dense(captcha_num_symbols, activation='softmax',
name='char_%d' % (i + 1))(x) for i in range(captcha_length)]
model = keras.Model(inputs=input_tensor, outputs=x)
return model
# A Sequence represents a dataset for training in Keras
# In this case, we have a folder full of images
# Elements of a Sequence are *batches* of images, of some size batch_size
class ImageSequence(keras.utils.Sequence):
def __init__(self, directory_name, batch_size, captcha_length, captcha_symbols, captcha_width, captcha_height, is_audio):
self.directory_name = directory_name
self.batch_size = batch_size
self.captcha_length = captcha_length
self.captcha_symbols = captcha_symbols
self.captcha_width = captcha_width
self.captcha_height = captcha_height
self.is_audio = is_audio
file_list = os.listdir(self.directory_name)
self.files = dict(zip(map(lambda x: x.split('.')[0], file_list), file_list))
self.used_files = []
self.count = len(file_list)
def __len__(self):
return int(np.floor(self.count / self.batch_size))
def __getitem__(self, idx):
X = np.zeros((self.batch_size, self.captcha_height, self.captcha_width, 3), dtype=np.float32)
y = [np.zeros((self.batch_size, len(self.captcha_symbols)), dtype=np.uint8)
for i in range(self.captcha_length)]
for i in range(self.batch_size):
random_image_label = random.choice(list(self.files.keys()))
random_image_file = self.files[random_image_label]
# We've used this image now, so we can't repeat it in this iteration
self.used_files.append(self.files.pop(random_image_label))
# We have to scale the input pixel values to the range [0, 1] for
# Keras so we divide by 255 since the image is 8-bit RGB
raw_data = cv2.imread(os.path.join(self.directory_name, random_image_file))
rgb_data = None
if not self.is_audio:
# gray scaling the image
gray_data = cv2.cvtColor(raw_data, cv2.COLOR_BGR2GRAY)
# applying adaptive threshold
adaptive = cv2.adaptiveThreshold(gray_data, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 11, 2)
kernel = np.ones((1, 1), np.uint8)
# dilating the image
dilation = cv2.dilate(adaptive, kernel, iterations=1)
# applying erode
erosion = cv2.erode(dilation, kernel, iterations=1)
kernel = np.ones((4, 1), np.uint8)
dilation = cv2.dilate(erosion, kernel, iterations=1)
# converting back to RGB format to maintain consistence of image shape
rgb_data = cv2.cvtColor(dilation, cv2.COLOR_GRAY2RGB)
else:
rgb_data = cv2.cvtColor(raw_data, cv2.COLOR_BGR2RGB)
processed_data = np.array(rgb_data) / 255.0
X[i] = processed_data
# We have a little hack here - we save captchas as TEXT_num.png if there is more than one captcha with
# the text "TEXT" So the real label should have the "_num" stripped out.
random_image_label = random_image_label.split('_')[0]
for j, ch in enumerate(random_image_label):
y[j][i, :] = 0
y[j][i, self.captcha_symbols.find(ch)] = 1
return X, y
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--width', help='Width of captcha image', type=int)
parser.add_argument('--height', help='Height of captcha image', type=int)
parser.add_argument('--length', help='Length of captchas in characters', type=int)
parser.add_argument('--batch-size', help='How many images in training captcha batches', type=int)
parser.add_argument('--train-dataset', help='Where to look for the training image dataset', type=str)
parser.add_argument('--validate-dataset', help='Where to look for the validation image dataset', type=str)
parser.add_argument('--output-model-name', help='Where to save the trained model', type=str)
parser.add_argument('--input-model', help='Where to look for the input model to continue training', type=str)
parser.add_argument('--epochs', help='How many training epochs to run', type=int)
parser.add_argument('--symbols', help='File with the symbols to use in captchas', type=str)
parser.add_argument('--is-audio', help='trains model for audio captcha', type=bool, default=True)
args = parser.parse_args()
if args.width is None:
print("Please specify the captcha image width")
exit(1)
if args.height is None:
print("Please specify the captcha image height")
exit(1)
if args.length is None:
print("Please specify the captcha length")
exit(1)
if args.batch_size is None:
print("Please specify the training batch size")
exit(1)
if args.epochs is None:
print("Please specify the number of training epochs to run")
exit(1)
if args.train_dataset is None:
print("Please specify the path to the training data set")
exit(1)
if args.validate_dataset is None:
print("Please specify the path to the validation data set")
exit(1)
if args.output_model_name is None:
print("Please specify a name for the trained model")
exit(1)
if args.symbols is None:
print("Please specify the captcha symbols file")
exit(1)
if args.is_audio is False:
print("Training model for Image Captcha")
else:
print("Training model for Audio Captcha")
captcha_symbols = None
with open(args.symbols) as symbols_file:
captcha_symbols = symbols_file.readline()
with tf.device('/device:GPU:0'):
model = create_model(args.length, len(captcha_symbols), (args.height, args.width, 3))
if args.input_model is not None:
model.load_weights(args.input_model)
model.compile(loss='categorical_crossentropy',
optimizer=keras.optimizers.Adam(1e-3, amsgrad=True),
metrics=['accuracy'])
model.summary()
training_data = ImageSequence(args.train_dataset, args.batch_size, args.length, captcha_symbols, args.width,
args.height, args.is_audio)
validation_data = ImageSequence(args.validate_dataset, args.batch_size, args.length, captcha_symbols,
args.width, args.height, args.is_audio)
callbacks = [keras.callbacks.EarlyStopping(patience=3),
keras.callbacks.ModelCheckpoint(args.output_model_name + '.h5', save_best_only=False)]
# Save the model architecture to JSON
with open(args.output_model_name + ".json", "w") as json_file:
json_file.write(model.to_json())
try:
model.fit_generator(generator=training_data,
validation_data=validation_data,
epochs=args.epochs,
callbacks=callbacks,
use_multiprocessing=True)
except KeyboardInterrupt:
print('KeyboardInterrupt caught, saving current weights as ' + args.output_model_name + '_resume.h5')
model.save_weights(args.output_model_name + '_resume.h5')
if __name__ == '__main__':
main()