tensorflow2.0 利用卷積神經網絡對notMNIST數據集的識別
0.背景
工作的目標是對notMNIST數據集的識別,這是這個學期選修的神經網絡課的期末大作業。notMNIST數據集與MNIST數據集相似,包含了590000+個28*28大小的A-J字符的灰度圖像。因為做大作業的時候也正值tensorflow2.0正式版發布不久,因此使用了2.0版的tensorflow完成的。個人水平有限,最終只是初步的完成了對notMNIST_small數據集的訓練與識別,經過20000輪訓練,最終正確率0.98.
1.環境
在編程環境上,使用了anaconda配置了python3.7+tensorflow2.0的環境。
2.實現
2.1思路
由於之前對神經網絡的框架包括tensorflow、pytorch都不熟悉,我首先找了關於tnsorflow2.0的相關資料,其中對我幫助最大的是簡單粗暴tensorflow2.0.最終實現對notMNIST的網絡架構就是此教程里的例程。可以說,作為一個初次接觸神經網絡實現的人而言,tensorflow2.0表現出的代碼是很優雅的,比其他地方看到的tensorflow1.x的源碼干凈許多,但是相應的,網絡上能直接使用的資料也少了很多。
言歸正傳,搭建好環境后,我利用教程里的MNIST進行了測試,能實現對MNIST的識別,效果與預期相當,並且代碼可讀性很好。希望能在教程的基礎上對代碼進行改寫完成任務。現貼出MNIST的例程:
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import datasets
import os
class MNISTLoader():
def __init__(self):
mnist = tf.keras.datasets.mnist
(self.train_data,self.train_lable),(self.test_data,self.test_lable)=mnist.load_data()
self.train_data = np.expand_dims(self.train_data.astype(np.float32)/255.0,axis=-1)
self.test_data = np.expand_dims(self.test_data.astype(np.float32)/255.0,axis=-1)
self.train_lable=self.train_lable.astype(np.int32)
self.test_lable=self.test_lable.astype(np.int32)
self.num_train_data,self.num_test_data=self.train_data.shape[0],self.test_data.shape[0]
def get_batch(self,batch_size):
index=np.random.randint(0,np.shape(self.train_data)[0],batch_size)
return self.train_data[index,:],self.train_lable[index]
class MLP(tf.keras.Model):
def __init__(self):
super().__init__()
self.flatten = tf.keras.layers.Flatten()
self.dense1 = tf.keras.layers.Dense(units=100,activation = tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=10)
def call(self,inputs):
x = self.flatten(inputs)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
class CNN(tf.keras.Model):
def __init__(self):
super().__init__()
self.conv1 = tf.keras.layers.Conv2D(
filters=32,
kernel_size = [5,5],
padding = 'same',
activation = tf.nn.relu
)
self.pool1 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
self.conv2 = tf.keras.layers.Conv2D(
filters=64,
kernel_size = [5,5],
padding = 'same',
activation = tf.nn.relu
)
self.pool2 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
self.flatten = tf.keras.layers.Reshape(target_shape=(7*7*64,))
self.dense1 = tf.keras.layers.Dense(units = 1024,activation = tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units = 10)
def call(self,inputs):
x = self.conv1(inputs)
x = self.pool1(x)
x = self.conv2(x)
x = self.pool2(x)
x = self.flatten(x)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
num_epochs = 5
batch_size = 50
learning_rate = 0.001
#model = MLP()
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
summary_writer = tf.summary.create_file_writer('./tensorboard')
num_batches = int(data_loader.num_train_data//batch_size*num_epochs)
for batch_index in range(100):
X,y = data_loader.get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true = y,y_pred = y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f"%(batch_index,loss.numpy()))
with summary_writer.as_default(): # 指定記錄器
tf.summary.scalar("loss", loss, step=batch_index) # 將當前損失函數的值寫入記錄器
grads = tape.gradient(loss,model.variables)
optimizer.apply_gradients(grads_and_vars = zip(grads,model.variables))
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data//batch_size)
for batch_index in range(num_batches):
start_index,end_index = batch_index*batch_size,(batch_index + 1)*batch_size
y_pred = model.predict(data_loader.test_data[start_index:end_index])
sparse_categorical_accuracy.update_state(y_true = data_loader.test_lable[start_index:end_index],y_pred = y_pred)
print ("test accuracy : %f"%sparse_categorical_accuracy.result())
#model.save('1.tf')
2.2問題與解決
在過程中,我遇到的最大困難是對數據集的讀取。在例程以及其他網絡資料中,總愛以MNIST這種已經內置讀取方式的數據集為例,但在對這種沒有內置讀取方法的數據集操作時,很大程度需要一些編程的能力,這也是最困擾我的地方。將notMNIST數據集解壓后會得到10個文件夾,每個文件夾的名字就是其標簽。一般而言,tensorflow讀取數據集在小批量的時候可以將所有圖片一次讀入內存,但在大批量的時候一般使用從硬盤讀取,tensorflow推薦使用tfrecord格式完成,但在2.0版本我屢次也沒成功使用,最終還是用了讀入內存的形式。對數據的處理主要參考了kaggle,貼上最終的源碼:
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 9 19:41:27 2019
@author: 騰飛
"""
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import os
dir_ = 'E:/tfrecord/notMNIST_small/'
letters = os.listdir(dir_)
# Retrieve pictures files names
pictures_files = {}
for letter in letters:
images = [name for name in os.listdir(dir_ + '%s/' % letter) if name[-4:] == '.png']
pictures_files[letter] = images
# Get the actual pictures
data = {}
for letter in letters:
print('---------------------------')
print('Retrieving for %s' % letter)
print('---------------------------')
images = []
for name in pictures_files[letter]:
try:
images.append(plt.imread(dir_+'{}/{}'.format(letter, name)))
except Exception as e:
print(e, name)
data[letter] = images
print('Done')
from sklearn.preprocessing import LabelEncoder
# Merge all data to one list
X = []
Y = []
X_nd = np.zeros(shape=(18724, 28, 28))
Y_nd = np.zeros(shape=(18724))
for key, list_ in data.items():
for img in list_:
X.append(img)
Y.append(key)
for i in range(len(X)):
X_nd[i, :, :] = X[i]
lbl_enc = LabelEncoder()
labels = np.array(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'])
lbl_enc.fit(labels)
Y = lbl_enc.transform(Y)
#Y_nd = keras.utils.np_utils.to_categorical(Y, num_classes=10)
Y_nd =Y
X_nd = np.expand_dims(X, -1).astype('float32')/255.0
from sklearn.model_selection import train_test_split
X_train, X_dev, Y_train, Y_dev = train_test_split(X_nd, Y_nd, test_size=.2)
X_train, X_test, Y_train, Y_test = train_test_split(X_train, Y_train, test_size=.1)
print('Training size: %s' % len(X_train))
print('DevSet size: %s' % len(X_dev))
print('TestSet size: %s' % (len(X_test)))
len(X_train) == len(Y_train)
print(Y_train.shape)
print(X_train.shape)
def get_batch(batch_size):
index=np.random.randint(0,np.shape(X_train)[0],batch_size)
return X_train[index,:],Y_train[index]
class CNN(tf.keras.Model):
def __init__(self):
super().__init__()
self.conv1 = tf.keras.layers.Conv2D(
filters=32,
kernel_size = [5,5],
padding = 'same',
activation = tf.nn.relu
)
self.pool1 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
self.conv2 = tf.keras.layers.Conv2D(
filters=64,
kernel_size = [5,5],
padding = 'same',
activation = tf.nn.relu
)
self.pool2 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
self.flatten = tf.keras.layers.Reshape(target_shape=(7*7*64,))
self.dense1 = tf.keras.layers.Dense(units = 1024,activation = tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units = 10)
def call(self,inputs):
x = self.conv1(inputs)
x = self.pool1(x)
x = self.conv2(x)
x = self.pool2(x)
x = self.flatten(x)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
num_epochs = 5
batch_size = 50
learning_rate = 0.001
#model = MLP()
model = CNN()
#data_loader = notMNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
summary_writer = tf.summary.create_file_writer('./tensorboard')
checkpoint = tf.train.Checkpoint(myAwesomeModel=model)
#num_batches = int(Y_train[0]//batch_size*num_epochs)
for batch_index in range(20000):
X,y = get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true = y,y_pred = y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f"%(batch_index,loss.numpy()))
with summary_writer.as_default(): # 指定記錄器
tf.summary.scalar("loss", loss, step=batch_index) # 將當前損失函數的值寫入記錄器
grads = tape.gradient(loss,model.variables)
optimizer.apply_gradients(grads_and_vars = zip(grads,model.variables))
if batch_index%1000==0:
path = checkpoint.save('./save/ctf_model_v.ckpt')
print("model saved to %s"%path)
#model.save('1.tf')
另外,寫了一小段程序對模型進行評估,我在用的時候是單獨寫了個test程序,主要代碼有:
model_to_be_restored = CNN()
#data_loader = notMNISTLoader()
checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored )
checkpoint.restore(tf.train.latest_checkpoint('./save'))
y_pred =np.argmax(model_to_be_restored.predict(X_test),axis=-1)
#print("test accuracy:%f"%(sum(y_pred==Y_test)/Y_test[0]))
#num_batches = int(Y_train[0]//batch_size*num_epochs)
i=0
right=0
dim = Y_test.shape[0]
for i in range(dim):
if Y_test[i]==y_pred[i]:
right = right+1
i=i+1
print("the acc is %f"%(right/dim))
3另附一些圖
3.1數據集解壓出的樣子

3.2數據集圖片詳情

3.3網絡結構示意

3.4loss下降過程

3.5測試正確率

4.一些說明
這個程序只是我一個選修課的大作業,做的其實很粗糙,包括由於讀取數據集的方式和手里的條件導致最終只是使用notMNIST_small數據集完成了訓練和測試,但這個整段代碼結構是很簡單也很易讀的,希望也能提供一些參考。就我對tensorflow2.0的使用體驗而言,tf2.0是很友好的,如果不考慮數據集的處理問題,那我這次實驗還是很順利的,如果后續對這個方向還有深入的學習,我會更新這部分內容。
