-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpictureclassifier.py
More file actions
352 lines (310 loc) · 18.5 KB
/
pictureclassifier.py
File metadata and controls
352 lines (310 loc) · 18.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import pickle
import os
import numpy as np
import pandas as pa
import category_encoders as ce
import shutil
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from genericpath import isfile
import tensorflow as tf
from tensorflow.compiler.tf2xla.python.xla import random_normal
from tensorflow.python.ops.variables import trainable_variables
import constants
from objectclassifier import Object_classifier
from objectfinder import Recognized_object
class Picture_classifier:
# Constructor creates a new Picture-classifier class instance. Use static deserialize method to
# deseralize Picture-classifier class instance from file instead of creating a new Picture_classifier.
def __init__(self, path:str, destroy_previous:bool=False, impertinent:bool=False) -> None:
path = path.strip('\"')
self.__encoder = ce.BinaryEncoder(cols="classification")
self.__optimizer = tf.keras.optimizers.SGD(learning_rate=constants.PICTURE_LEARNING_RATE)
self.__classes = self.__get_classes(path)
self.__object_classifier = Object_classifier()
self.__weights = None
self.__biases = None
self.__model = self.__create_or_train_picture_classifier_model(path, destroy_previous, impertinent)
self.__features = self.__object_classifier.get_categories()
# Before destructing this Picture_classifier class instance, we need to serialize it.
def __del__(self) -> None:
self.__serialize_picture_classifier()
# Private class method which serializes this class instance.
def __serialize_picture_classifier(self) -> None:
try:
if os.path.isfile(constants.PICTURE_CLASSIFIER_NAME):
os.remove(constants.PICTURE_CLASSIFIER_NAME)
with open(constants.PICTURE_CLASSIFIER_NAME, "wb+") as out_file:
pickle.dump(self, out_file)
except Exception as err:
print(f"Unexpected error when serializing Picture_classifier: {err=}, {type(err)=}")
# Private class method to find out classes into which to classify pictures.
# Class names are the top folder names.
def __get_classes(self, path:str) -> list:
classes = []
# Check does picture_classifier have classes already.
try:
classes.extend(self.__classes)
except:
pass
# Get classes from the path if they are not already in the classes list.
path = path.strip('\"')
items = os.listdir(path)
for item in items:
if os.path.isdir(os.path.join(path, item)):
if item not in classes:
classes.append(item)
return classes
# Private class method to create a trained neural network model to classify pictures. You need to have a
# path to following directory structure:
# path
# folders for picture classification I.E. the way you want to classify pictures: folder names are classes
# folders for object classification I.E. Object_finders: folder names are main_categories
# folders for further object classification inside an Object_finder: folder names are sub_categories
# picture files for model training
def __create_or_train_picture_classifier_model(self, path:str, destroy_previous:bool=False, impertinent:bool=False) -> None:
path = path.strip('\"')
# First make all object finders or update them. This makes object classifiers trained.
for classpath in self.__classes:
classpath = os.path.join(path, classpath)
try:
object_finders = os.listdir(classpath)
for object_finder in object_finders:
object_finder_path = os.path.join(classpath, object_finder)
self.__object_classifier.make_object_finder(object_finder_path, destroy_previous, impertinent)
except Exception as err:
pass
# Second gather data to train picture classifier: predict a classification for every picture in picture
# files for model training and make a dataset, which has a column for each feature (sub_category) and
# also one column for the class (classification).
categories = self.__object_classifier.get_categories()
data = dict(zip(categories, [None]*len(categories)))
data["classification"] = None
for root, dirs, files in os.walk(path):
for file in files:
filepath = os.path.join(root, file)
recognized_objects = self.__object_classifier.recognize_objects_str(filepath)[0]
row_of_features = self.__parse_recognized_objects_dict(recognized_objects)
for feature in row_of_features:
value = [row_of_features[feature]]
old_values = data[feature]
if old_values == None:
data[feature] = value
else:
old_values.extend(value)
temp = root.replace(path, "").split("\\")
last_index = len(temp) -1
value = f"{temp[last_index-2]}"
old_values = data["classification"]
if old_values == None:
data["classification"] = value
elif type(old_values) == str:
data["classification"] = [old_values, value]
else:
old_values.append(value)
dataset = pa.DataFrame(data)
# The dependent variable y is not numerical value in a dataset but a class name. We use binaryencoder to
# change it numeric. In order to get back the class name, first need to save the class names in a list
# and replace class name by index number in a dataset.
shape = dataset.shape
ind = dataset.columns.get_loc("classification")
for i in range(0, shape[0]-1, 1):
j = self.__classes.index(dataset["classification"][i])
#dataset.iloc[ind][i] = j #Don't use this, it may not work here.
dataset.iloc[i, ind] = j
dataset = self.__encoder.fit_transform(dataset)
num_binary_columns = dataset.shape[1] - shape[1] + 1
# Divide dataset into train and test parts and separate feature matrix X from dependent variable y.
classification = []
for column in dataset.columns:
if column.find("classification") != -1:
classification.append(column)
X = dataset.drop(classification, axis=1)
y = dataset[classification]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=125)
# Need to scale all values in all columns in X_train. Let's use standardscaler.
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
# X_test needs a little different treatment.
X_test = scaler.transform(X_test)
# Setup training parameters.
num_features = len(self.__object_classifier.get_categories())
learning_rate = constants.PICTURE_LEARNING_RATE
training_steps = constants.PICTURE_TRAINING_STEPS
batch_size = constants.PICTURE_BATCH_SIZE
display_step = constants.PICTURE_DISPLAY_STEP
n_hidden = constants.PICTURE_NUMBER_OF_HIDDEN_NEURONS
# Use tf.data API to shuffle and batch data.
train_data = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_data = train_data.repeat().shuffle(60000).batch(batch_size).prefetch(1)
if destroy_previous:
# Store layers weight and bias: use random value generator to initialize weights.
random_normal = tf.random_normal_initializer()
'''
self.__weights = {
"h": tf.Variable(tf.cast(tf.Variable(random_normal([num_features, n_hidden])), dtype="float64"), trainable=True),
"out": tf.Variable(tf.cast(tf.Variable(random_normal([n_hidden, num_binary_columns])), dtype="float64"), trainable=True)
}
self.__biases = {
"b": tf.Variable(tf.cast(tf.Variable(tf.zeros([n_hidden])), dtype="float64"), trainable=True),
"out": tf.Variable(tf.cast(tf.Variable(tf.zeros([num_binary_columns])), dtype="float64"), trainable=True)
}
'''
random_normal = tf.random_normal_initializer()
self.__weights = {
"h1": tf.Variable(tf.cast(tf.Variable(random_normal([num_features, n_hidden])), dtype="float64"), trainable=True),
"h2": tf.Variable(tf.cast(tf.Variable(random_normal([n_hidden, n_hidden])), dtype="float64"), trainable=True),
"out": tf.Variable(tf.cast(tf.Variable(random_normal([n_hidden, num_binary_columns])), dtype="float64"), trainable=True)
}
self.__biases = {
"b1": tf.Variable(tf.cast(tf.Variable(tf.zeros([n_hidden])), dtype="float64"), trainable=True),
"b2": tf.Variable(tf.cast(tf.Variable(tf.zeros([n_hidden])), dtype="float64"), trainable=True),
"out": tf.Variable(tf.cast(tf.Variable(tf.zeros([num_binary_columns])), dtype="float64"), trainable=True)
}
else:
# We save some old weights and biases and use them to continue training. But we need to set some h1 and all out variables.
# Set h1 variables. We use old h1 tensor as a part of the new, possibly bigger h1 tensor.
random_normal = tf.random_normal_initializer()
temp_h1 = tf.Variable(
tf.cast(tf.Variable(random_normal([num_features, n_hidden])), dtype="float64"), trainable=True)
indices = [
[i, j]
for i in range(0, self.__weights["h1"].shape[0])
for j in range(0, self.__weights["h1"].shape[1])
]
updates = tf.reshape(self.__weights["h1"], [-1])
self.__weights["h1"] = tf.tensor_scatter_nd_update(temp_h1, indices, updates)
self.__weights["h1"] = tf.Variable(self.__weights["h1"], trainable=True)
# Set out variables.
self.__weights["out"] = tf.Variable(
tf.cast(tf.Variable(random_normal([n_hidden, num_binary_columns])), dtype="float64"), trainable=True)
self.__biases["out"] = tf.Variable(
tf.cast(tf.Variable(tf.zeros([num_binary_columns])), dtype="float64"), trainable=True)
# Run training for the given number of steps.
for index, (batch_x, batch_y) in enumerate(train_data.take(training_steps), start=1):
self.__optimization(batch_x, batch_y)
# Every DISPLAY_STEP times we print the situation into the terminal.
if index % display_step == 0:
pred = self.__neural_net(batch_x)
loss = self.__cross_entropy(pred, batch_y)
acc = self.__accuracy(pred, batch_y)
print(f"Picture classifier training epoch: {index}, loss: {loss}, accuracy: {acc}")
# Finally test model on validation set.
pred = self.__neural_net(X_test)
print(f"Test accuracy of Picture Classifier: {self.__accuracy(pred, y_test)}")
self.__features = self.__object_classifier.get_categories()
# This private class method is the core of the picture classifier.
'''
def __neural_net(self, input_data:tf.Tensor) -> tf.Tensor:
hidden_layer = tf.add(tf.matmul(input_data, self.__weights["h"]), self.__biases["b"])
hidden_layer = tf.nn.sigmoid(hidden_layer)
out_layer = tf.add(tf.matmul(hidden_layer, self.__weights["out"]), self.__biases["out"])
out_layer = tf.nn.softmax(out_layer)
return tf.where(out_layer < 0.5, 0.0, tf.where(out_layer >= 0.5, 1.0, out_layer))
'''
def __neural_net(self, input_data:tf.Tensor) -> tf.Tensor:
hidden_layer1 = tf.add(tf.matmul(input_data, self.__weights["h1"]), self.__biases["b1"])
hidden_layer1 = tf.nn.relu(hidden_layer1)
hidden_layer2 = tf.add(tf.matmul(hidden_layer1, self.__weights["h2"]), self.__biases["b2"])
hidden_layer2 = tf.nn.sigmoid(hidden_layer2)
out_layer = tf.add(tf.matmul(hidden_layer2, self.__weights["out"]), self.__biases["out"])
out_layer = tf.nn.softmax(out_layer)
return tf.where(out_layer < 0.5, 0.0, tf.where(out_layer >= 0.5, 1.0, out_layer))
# Private class method to compare predicted values to the real ones.
def __cross_entropy(self, y_pred:tf.Tensor, y_true:tf.Tensor) -> tf.Tensor:
y_true = tf.cast(y_true, dtype="float64")
y_pred = tf.cast(tf.clip_by_value(y_pred, 1e-9, 1.0), dtype="float64")
return tf.reduce_mean(-tf.reduce_sum(y_true * tf.math.log(y_pred), [0, 1]))
# Private class method to adjust the weights and biases between epochs.
def __optimization(self, X:tf.Tensor, y:tf.Tensor) -> None:
with tf.GradientTape() as tape:
tape.watch(self.__weights.values())
tape.watch(self.__biases.values())
pred = self.__neural_net(X)
loss = self.__cross_entropy(pred, y)
trainable_variables = list(self.__weights.values()) + list(self.__biases.values())
gradients = tape.gradient(loss, trainable_variables)
self.__optimizer.apply_gradients(zip(gradients, trainable_variables))
# Private class method to get a tensor of which numbers describe the accuracy of the picture classifier.
def __accuracy(self, y_pred:tf.Tensor, y_true:tf.Tensor) -> tf.Tensor:
correct_prediction = tf.equal(tf.cast(y_pred, tf.int64), y_true)
return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis=0)
# Private class method to parse one row of features as a dictionary.
def __parse_recognized_objects_dict(self, recognized_objects:list) -> dict:
row_of_features = dict()
feature_titles = self.__object_classifier.get_categories()
for item1 in feature_titles:
row_of_features[item1] = 0
for item2 in recognized_objects:
feature = f"{item2.main_category}:{item2.sub_category}"
if item1 == feature:
probability = row_of_features[item1]
row_of_features[item1] = item2.probability + probability
return row_of_features
# Private class method to parse one row of features as a tensor.
def __parse_recognized_objects_tensor(self, recognized_objects:list) -> tf.Variable:
row_of_features = []
feature_titles = self.__object_classifier.get_categories()
for index1 in range(0, len(feature_titles), 1):
row_of_features.append(0)
item1 = feature_titles[index1]
for item2 in recognized_objects:
feature = f"{item2.main_category}:{item2.sub_category}"
if item1 == feature:
probability = row_of_features[index1]
# probability = item2.probability
row_of_features[index1] = item2.probability + probability
feature_tensor = tf.Variable(initial_value=[row_of_features], shape=[1, len(self.__features)])
return feature_tensor
# Private class method to decode binary encoded data to int.
def __binary_decode(self, bit_array:list) -> int:
integer_value = -1
index = 0
for bit in bit_array:
bit = int(bit)
integer_value = integer_value + bit * pow(2, index)
index = index + 1
return integer_value
# Public static class method which deserializes this class instance from file.
@staticmethod
def deserialize() -> object:
try:
with open(constants.PICTURE_CLASSIFIER_NAME, "rb") as in_file:
return pickle.load(in_file)
except Exception as err:
return None
# Public class method to move pictures located in input_path folder as classified folders into output_path folder.
def classify_pictures(self, input_path:str, output_path:str) -> None:
input_path = input_path.strip('\"')
output_path = output_path.strip('\"')
pictures = os.listdir(input_path)
if len(pictures) == 0:
print(f"The folder does not contain files: {input_path}")
return
for picture in pictures:
picture = os.path.join(input_path, picture)
if os.path.isfile(picture):
recognized_objects = self.__object_classifier.multi_tile_recognize_objects(picture)
feature_tensor = tf.cast(self.__parse_recognized_objects_tensor(recognized_objects), dtype="float64")
pred = self.__neural_net(feature_tensor)
# pred is in a binary-encoded format, need to decode it to get the index of the class name.
bit_list = pred.numpy()[0].tolist()
decoded = self.__binary_decode(bit_list)
if (decoded < len(self.__classes)):
class_name = self.__classes[decoded]
else:
# If index is too big, put the picture to the "Unsorted pictures" folder
class_name = "Unsorted pictures"
# Create a folder name <class_name>, if it doesn't exist.
directory = os.path.join(output_path, class_name)
if not os.path.isdir(directory):
os.mkdir(directory)
# Move picture from input_path to directory.
shutil.move(os.path.join(input_path, picture), directory)
# Public class method to train model. This doesn't create a new Picture_classifier class instance.
def train_picture_classifier_model(self, path:str, destroy_previous:bool, impertinent:bool) -> None:
path = path.strip('\"')
self.__classes = self.__get_classes(path)
if self.__object_classifier == None:
self.__object_classifier = Object_classifier()
self.__create_or_train_picture_classifier_model(path, destroy_previous, impertinent)