使用tensorflow2.0完成對notMNIST數據集的識別學習


tensorflow2.0 利用卷積神經網絡對notMNIST數據集的識別

0.背景

工作的目標是對notMNIST數據集的識別,這是這個學期選修的神經網絡課的期末大作業。notMNIST數據集與MNIST數據集相似,包含了590000+個28*28大小的A-J字符的灰度圖像。因為做大作業的時候也正值tensorflow2.0正式版發布不久,因此使用了2.0版的tensorflow完成的。個人水平有限,最終只是初步的完成了對notMNIST_small數據集的訓練與識別,經過20000輪訓練,最終正確率0.98.

1.環境

在編程環境上,使用了anaconda配置了python3.7+tensorflow2.0的環境。

2.實現

2.1思路

由於之前對神經網絡的框架包括tensorflow、pytorch都不熟悉,我首先找了關於tnsorflow2.0的相關資料,其中對我幫助最大的是簡單粗暴tensorflow2.0.最終實現對notMNIST的網絡架構就是此教程里的例程。可以說,作為一個初次接觸神經網絡實現的人而言,tensorflow2.0表現出的代碼是很優雅的,比其他地方看到的tensorflow1.x的源碼干凈許多,但是相應的,網絡上能直接使用的資料也少了很多。
言歸正傳,搭建好環境后,我利用教程里的MNIST進行了測試,能實現對MNIST的識別,效果與預期相當,並且代碼可讀性很好。希望能在教程的基礎上對代碼進行改寫完成任務。現貼出MNIST的例程:

import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
from    tensorflow.keras import datasets
import  os

class MNISTLoader():
        def __init__(self):
            mnist = tf.keras.datasets.mnist
            (self.train_data,self.train_lable),(self.test_data,self.test_lable)=mnist.load_data()
            self.train_data = np.expand_dims(self.train_data.astype(np.float32)/255.0,axis=-1)
            self.test_data = np.expand_dims(self.test_data.astype(np.float32)/255.0,axis=-1)
            self.train_lable=self.train_lable.astype(np.int32)
            self.test_lable=self.test_lable.astype(np.int32)
            self.num_train_data,self.num_test_data=self.train_data.shape[0],self.test_data.shape[0]
        
        def get_batch(self,batch_size):
            index=np.random.randint(0,np.shape(self.train_data)[0],batch_size)
            return self.train_data[index,:],self.train_lable[index]


class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(units=100,activation = tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)
    
    def call(self,inputs):
        x = self.flatten(inputs)
        x = self.dense1(x)
        x = self.dense2(x)
        output = tf.nn.softmax(x)
        return output

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
                filters=32,
                kernel_size = [5,5],
                padding = 'same',
                activation = tf.nn.relu
                )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
        self.conv2 = tf.keras.layers.Conv2D(
                filters=64,
                kernel_size = [5,5],
                padding = 'same',
                activation = tf.nn.relu
                )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7*7*64,))
        self.dense1 = tf.keras.layers.Dense(units = 1024,activation = tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units = 10)
    
    def call(self,inputs):
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        output = tf.nn.softmax(x)
        return output

        
    
    
num_epochs = 5
batch_size = 50
learning_rate = 0.001
#model = MLP()
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
summary_writer = tf.summary.create_file_writer('./tensorboard')

num_batches = int(data_loader.num_train_data//batch_size*num_epochs)

for batch_index in range(100):
    X,y = data_loader.get_batch(batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true = y,y_pred = y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f"%(batch_index,loss.numpy()))
        with summary_writer.as_default():                           # 指定記錄器
            tf.summary.scalar("loss", loss, step=batch_index)       # 將當前損失函數的值寫入記錄器
    grads = tape.gradient(loss,model.variables)
    optimizer.apply_gradients(grads_and_vars = zip(grads,model.variables))


    
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data//batch_size)
for batch_index in range(num_batches):
    start_index,end_index = batch_index*batch_size,(batch_index + 1)*batch_size
    y_pred = model.predict(data_loader.test_data[start_index:end_index])
    sparse_categorical_accuracy.update_state(y_true = data_loader.test_lable[start_index:end_index],y_pred = y_pred)
print ("test accuracy : %f"%sparse_categorical_accuracy.result())

#model.save('1.tf')

2.2問題與解決

在過程中,我遇到的最大困難是對數據集的讀取。在例程以及其他網絡資料中,總愛以MNIST這種已經內置讀取方式的數據集為例,但在對這種沒有內置讀取方法的數據集操作時,很大程度需要一些編程的能力,這也是最困擾我的地方。將notMNIST數據集解壓后會得到10個文件夾,每個文件夾的名字就是其標簽。一般而言,tensorflow讀取數據集在小批量的時候可以將所有圖片一次讀入內存,但在大批量的時候一般使用從硬盤讀取,tensorflow推薦使用tfrecord格式完成,但在2.0版本我屢次也沒成功使用,最終還是用了讀入內存的形式。對數據的處理主要參考了kaggle,貼上最終的源碼:

  # -*- coding: utf-8 -*-
"""
Created on Sat Nov  9 19:41:27 2019

@author: 騰飛
"""

import numpy as np 
import matplotlib.pyplot as plt
import tensorflow as tf
import os

dir_ = 'E:/tfrecord/notMNIST_small/'
letters = os.listdir(dir_)

# Retrieve pictures files names
pictures_files = {}
for letter in letters:
    images = [name for name in os.listdir(dir_ + '%s/' % letter) if name[-4:] == '.png']
    pictures_files[letter] = images
# Get the actual pictures
data = {}
for letter in letters:
    print('---------------------------')
    print('Retrieving for %s' % letter)
    print('---------------------------')
    images = []
    for name in pictures_files[letter]:
        try:
            images.append(plt.imread(dir_+'{}/{}'.format(letter, name)))
        except Exception as e:
            print(e, name)
    data[letter] = images
print('Done')



from sklearn.preprocessing import  LabelEncoder

# Merge all data to one list
X = []
Y = []
X_nd = np.zeros(shape=(18724, 28, 28))
Y_nd = np.zeros(shape=(18724))
for key, list_ in data.items():
    for img in list_:
        X.append(img)
        Y.append(key)

for i in range(len(X)):
    X_nd[i, :, :] = X[i]
    

lbl_enc = LabelEncoder()
labels = np.array(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'])
lbl_enc.fit(labels)
Y = lbl_enc.transform(Y)
#Y_nd = keras.utils.np_utils.to_categorical(Y, num_classes=10)
Y_nd =Y
X_nd = np.expand_dims(X, -1).astype('float32')/255.0

from sklearn.model_selection import train_test_split
X_train, X_dev, Y_train, Y_dev = train_test_split(X_nd, Y_nd, test_size=.2)
X_train, X_test, Y_train, Y_test = train_test_split(X_train, Y_train, test_size=.1)
print('Training size: %s' % len(X_train))
print('DevSet size: %s' % len(X_dev))
print('TestSet size: %s' % (len(X_test)))
len(X_train) == len(Y_train)

print(Y_train.shape)
print(X_train.shape)




def get_batch(batch_size):
    index=np.random.randint(0,np.shape(X_train)[0],batch_size)
    return X_train[index,:],Y_train[index]

    
       
 

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
                filters=32,
                kernel_size = [5,5],
                padding = 'same',
                activation = tf.nn.relu
                )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
        self.conv2 = tf.keras.layers.Conv2D(
                filters=64,
                kernel_size = [5,5],
                padding = 'same',
                activation = tf.nn.relu
                )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size = [2,2],strides = 2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7*7*64,))
        self.dense1 = tf.keras.layers.Dense(units = 1024,activation = tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units = 10)
    
    def call(self,inputs):
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        output = tf.nn.softmax(x)
        return output

       
num_epochs = 5
batch_size = 50
learning_rate = 0.001
#model = MLP()
model = CNN()
#data_loader = notMNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
summary_writer = tf.summary.create_file_writer('./tensorboard')
checkpoint = tf.train.Checkpoint(myAwesomeModel=model)

#num_batches = int(Y_train[0]//batch_size*num_epochs)

for batch_index in range(20000):
    X,y = get_batch(batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true = y,y_pred = y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f"%(batch_index,loss.numpy()))
        with summary_writer.as_default():                           # 指定記錄器
            tf.summary.scalar("loss", loss, step=batch_index)       # 將當前損失函數的值寫入記錄器
    grads = tape.gradient(loss,model.variables)
    optimizer.apply_gradients(grads_and_vars = zip(grads,model.variables))
    if batch_index%1000==0:
        path = checkpoint.save('./save/ctf_model_v.ckpt')
        print("model saved to %s"%path)
    



#model.save('1.tf')
        
        

另外,寫了一小段程序對模型進行評估,我在用的時候是單獨寫了個test程序,主要代碼有:

model_to_be_restored = CNN()
#data_loader = notMNISTLoader()

checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored )
checkpoint.restore(tf.train.latest_checkpoint('./save'))
y_pred =np.argmax(model_to_be_restored.predict(X_test),axis=-1)
#print("test accuracy:%f"%(sum(y_pred==Y_test)/Y_test[0]))
#num_batches = int(Y_train[0]//batch_size*num_epochs)
i=0
right=0
dim = Y_test.shape[0]
for i in range(dim):
    if Y_test[i]==y_pred[i]:
        right = right+1
    i=i+1
print("the acc is %f"%(right/dim))

3另附一些圖

3.1數據集解壓出的樣子
數據集解壓出的樣子
3.2數據集圖片詳情
數據集圖片詳情
3.3網絡結構示意
網絡結構示意
3.4loss下降過程
loss下降過程
3.5測試正確率
測試正確率

4.一些說明

這個程序只是我一個選修課的大作業,做的其實很粗糙,包括由於讀取數據集的方式和手里的條件導致最終只是使用notMNIST_small數據集完成了訓練和測試,但這個整段代碼結構是很簡單也很易讀的,希望也能提供一些參考。就我對tensorflow2.0的使用體驗而言,tf2.0是很友好的,如果不考慮數據集的處理問題,那我這次實驗還是很順利的,如果后續對這個方向還有深入的學習,我會更新這部分內容。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM