浙江財經大學專業實踐深度學習tensorflow——陽誠磚
1.案例描述
使用卷積神經網絡對CIFAR-10數據集進行分類
2.CIFAR-10數據集
2.1 下載CIFAR-10數據集
import urllib.request
import os
import tarfile
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
print(tf.__version__)
print(tf.test.is_gpu_available())
# 下載
url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
filepath = 'data/cifar-10-python.tar.gz'
if not os.path.isfile(filepath):
result=urllib.request.urlretrieve(url,filepath)
print('downloaded:',result)
else:
print('Data file already exists.')
# 解壓
if not os.path.exists("data/cifar-10-batches-py"):
tfile = tarfile.open("data/cifar-10-python.tar.gz", 'r:gz')
result=tfile.extractall('data/')
print('Extracted to ./data/cifar-10-batches-py/')
else:
print('Directory already exists.')
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-1-80a0a8945cc4> in <module>()
4 import os
5 os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
----> 6 print(tf.__version__)
7 print(tf.test.is_gpu_available())
8
NameError: name 'tf' is not defined
2.2 導入CIFAR-10數據集
import os
import numpy as np
import pickle as p
import tensorflow as tf
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
print(tf.__version__)
print(tf.test.is_gpu_available())
def load_CIFAR_batch(filename):
""" load single batch of cifar """
with open(filename, 'rb')as f:
# 一個樣本由標簽和圖像數據組成
# <1 x label><3072 x pixel> (3072=32x32x3)
# ...
# <1 x label><3072 x pixel>
data_dict = p.load(f, encoding='bytes')
images= data_dict[b'data']
labels = data_dict[b'labels']
# 把原始數據結構調整為: BCWH
images = images.reshape(10000, 3, 32, 32)
# tensorflow處理圖像數據的結構:BWHC
# 把通道數據C移動到最后一個維度
images = images.transpose (0,2,3,1)
labels = np.array(labels)
return images, labels
def load_CIFAR_data(data_dir):
"""load CIFAR data"""
images_train=[]
labels_train=[]
for i in range(5):
f=os.path.join(data_dir,'data_batch_%d' % (i+1))
print('loading ',f)
# 調用 load_CIFAR_batch( )獲得批量的圖像及其對應的標簽
image_batch,label_batch=load_CIFAR_batch(f)
images_train.append(image_batch)
labels_train.append(label_batch)
Xtrain=np.concatenate(images_train)
Ytrain=np.concatenate(labels_train)
del image_batch ,label_batch
Xtest,Ytest=load_CIFAR_batch(os.path.join(data_dir,'test_batch'))
print('finished loadding CIFAR-10 data')
# 返回訓練集的圖像和標簽,測試集的圖像和標簽
return Xtrain,Ytrain,Xtest,Ytest
data_dir = 'data/cifar-10-batches-py/'
Xtrain,Ytrain,Xtest,Ytest = load_CIFAR_data(data_dir)
2.3 顯示數據集信息
print('training data shape:',Xtrain.shape)
print('training labels shape:',Ytrain.shape)
print('test data shape:',Xtest.shape)
print('test labels shape:',Ytest.shape)
2.4 查看單項image和label
%matplotlib inline
import matplotlib.pyplot as plt
# 查看image
plt.imshow(Xtrain[34])
# 查看label
# 對應類別信息可查看:http://www.cs.toronto.edu/~kriz/cifar.html
print(Ytrain[34])
2.5 查看多項images與label
import matplotlib.pyplot as plt
#定義標簽字典,每一個數字所代表的圖像類別的名稱
label_dict={0:"airplane",1:"automobile",2:"bird",3:"cat",4:"deer",
5:"dog",6:"frog",7:"horse",8:"ship",9:"truck"}
#定義顯示圖像數據及其對應標簽的函數
def plot_images_labels_prediction(images,labels,prediction,idx,num=10):
fig = plt.gcf()
fig.set_size_inches(12, 6)
if num>10:
num=10
for i in range(0, num):
ax=plt.subplot(2,5, 1+i)
ax.imshow(images[idx],cmap='binary')
title=str(i)+','+label_dict[labels[idx]]
if len(prediction)>0:
title+='=>'+label_dict[prediction[idx]]
ax.set_title(title,fontsize=10)
idx+=1
plt.show()
# 顯示圖像數據及其對應標簽
plot_images_labels_prediction(Xtest,Ytest,[],1,10)
3. 數據預處理
3.1 圖像數據預處理
#查看圖像數據信息
#顯示第一個圖的第一個像素點
Xtrain[0][0][0]
# 將圖像進行數字標准化
Xtrain_normalize = Xtrain.astype('float32') / 255.0
Xtest_normalize = Xtest.astype('float32') / 255.0
# 查看預處理后圖像數據信息
Xtrain_normalize[0][0][0]
3.2 標簽數據預處理
# 查看標簽數據
Ytrain[:10]
# 獨熱編碼
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(sparse=False)
yy =[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]
encoder.fit(yy)
Ytrain_reshape = Ytrain.reshape(-1, 1)
Ytrain_onehot = encoder.transform(Ytrain_reshape)
Ytest_reshape = Ytest.reshape(-1,1)
Ytest_onehot = encoder.transform(Ytest_reshape)
# 顯示編碼后的情況
Ytrain_onehot.shape
Ytrain[:5]
Ytrain_onehot[:5]
4. 建立CIFAR-10圖像分類模型
# import tensorflow as tf
# tf.reset_default_graph()
# import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# print(tf.__version__)
# print(tf.test.is_gpu_available())
4.1 定義共享函數
# 定義權值
def weight(shape):
# 在構建模型時,需要使用tf.Variable來創建一個變量
# 在訓練時,這個變量不斷更新
# 使用函數tf.truncated_normal(截斷的正態分布)生成標准差為0.1的隨機數來初始化權值
return tf.Variable(tf.truncated_normal(shape, stddev=0.1), name ='W')
# 定義偏置
# 初始化為0.1
def bias(shape):
return tf.Variable(tf.constant(0.1, shape=shape), name = 'b')
# 定義卷積操作
# 步長為1,padding為'SAME'
def conv2d(x, W):
# tf.nn.conv2d(input, filter, strides, padding, use_cudnn_on_gpu=None, name=None)
return tf.nn.conv2d(x, W, strides=[1,1,1,1], padding='SAME')
# 定義池化操作
# 步長為2,即原尺寸的長和寬各除以2
def max_pool_2x2(x):
# tf.nn.max_pool(value, ksize, strides, padding, name=None)
return tf.nn.max_pool(x, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')
4.2 定義網絡結構
# 輸入層
# 32x32圖像,通道為3(RGB)
with tf.name_scope('input_layer'):
x = tf.placeholder('float',shape=[None, 32, 32, 3],name="x")
# 第1個卷積層
# 輸入通道:3,輸出通道:32,卷積后圖像尺寸不變,依然是32x32
with tf.name_scope('conv_1'):
W1 = weight([3,3,3,32]) # [k_width, k_height, input_chn, output_chn]
b1 = bias([32]) # 與output_chn 一致
conv_1=conv2d(x, W1)+ b1
conv_1 = tf.nn.relu(conv_1 )
# 第1個池化層
# 將32x32圖像縮小為16x16,池化不改變通道數量,因此依然是32個
with tf.name_scope('pool_1'):
pool_1 = max_pool_2x2(conv_1)
# 第2個卷積層
# 輸入通道:32,輸出通道:64,卷積后圖像尺寸不變,依然是16x16
with tf.name_scope('conv_2'):
W2 = weight([3,3,32,64])
b2 = bias([64])
conv_2=conv2d(pool_1, W2)+ b2
conv_2 = tf.nn.relu(conv_2)
# 第2個池化層
# 將16x16圖像縮小為8x8,池化不改變通道數量,因此依然是64個
with tf.name_scope('pool_2'):
pool_2 = max_pool_2x2(conv_2)
# 全連接層
# 將池第2個池化層的64個8x8的圖像轉換為一維的向量,長度是 64*8*8=4096
# 128個神經元
with tf.name_scope('fc'):
W3= weight([4096, 128]) #有128個神經元
b3= bias([128])
flat = tf.reshape(pool_2, [-1, 4096])
h = tf.nn.relu(tf.matmul(flat, W3) + b3)
h_dropout= tf.nn.dropout(h, keep_prob=0.8)
# 輸出層
# 輸出層共有10個神經元,對應到0-9這10個類別
with tf.name_scope('output_layer'):
W4 = weight([128,10])
b4 = bias([10])
pred= tf.nn.softmax(tf.matmul(h_dropout, W4)+b4)
4.3 構建模型
with tf.name_scope("optimizer"):
#定義占位符
y = tf.placeholder("float", shape=[None, 10],
name="label")
# 定義損失函數
loss_function = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits
(logits=pred ,
labels=y))
# 選擇優化器
optimizer = tf.train.AdamOptimizer(learning_rate=0.0001) \
.minimize(loss_function)
4.4 定義准確率
with tf.name_scope("evaluation"):
correct_prediction = tf.equal(tf.argmax(pred, 1),
tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
5.訓練模型
5.1 啟動會話
import os
from time import time
train_epochs =25
batch_size = 50
total_batch = int(len(Xtrain)/batch_size)
epoch_list=[];accuracy_list=[];loss_list=[];
epoch = tf.Variable(0,name='epoch',trainable=False)
startTime=time()
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
5.2 斷點續訓
# 設置檢查點存儲目錄
ckpt_dir = "CIFAR10_log/"
if not os.path.exists(ckpt_dir):
os.makedirs(ckpt_dir)
#生成saver
saver = tf.train.Saver(max_to_keep=1)
# 如果有檢查點文件,讀取最新的檢查點文件,恢復各種變量值
ckpt = tf.train.latest_checkpoint(ckpt_dir )
if ckpt != None:
saver.restore(sess, ckpt) #加載所有的參數
# 從這里開始就可以直接使用模型進行預測,或者接着繼續訓練了
else:
print("Training from scratch.")
# 獲取續訓參數
start = sess.run(epoch)
print("Training starts form {} epoch.".format(start+1))
5.3 迭代訓練
def get_train_batch(number, batch_size):
return Xtrain_normalize[number*batch_size:(number+1)*batch_size],\
Ytrain_onehot[number*batch_size:(number+1)*batch_size]
for ep in range(start,train_epochs):
for i in range(total_batch):
batch_x,batch_y = get_train_batch(i,batch_size) # 讀取批次數據
sess.run(optimizer,feed_dict = {x:batch_x, y:batch_y}) # 執行批次訓練
if i % 100 == 0:
print("Step {}".format(i),"finished")
#total_batch個批次訓練完成后 使用驗證數據計算誤差與准確率
loss,acc = sess.run([loss_function,accuracy],feed_dict = {x:batch_x, y:batch_y})
epoch_list.append(ep + 1)
loss_list.append(loss);
accuracy_list.append(acc)
# 打印訓練過程中的詳細信息
print("Train Epoch:",'%02d' % (sess.run(epoch) + 1),\
"Loss = ","{:.6f}".format(loss),"Accuracy = ",acc)
#保存檢查點
saver.save(sess,ckpt_dir + "CIFAR10_cnn_model.cpkt",global_step = ep + 1)
sess.run(epoch.assign(ep + 1))
#顯示運行總時間
duration = time() - startTime
print("Train Finished takes : ",duration)
5.4 可視化損失值
%matplotlib inline
import matplotlib.pyplot as plt
fig = plt.gcf()
fig.set_size_inches(4,2)
plt.plot(epoch_list, loss_list, label = 'loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['loss'], loc='upper right')
5.5 可視化准確率
plt.plot(epoch_list, accuracy_list,label="accuracy" )
fig = plt.gcf()
fig.set_size_inches(4,2)
plt.ylim(0.1,1)
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend()
plt.show()
6. 評估模型及預測
6.1 計算測試集上的准確率
test_total_batch = int(len(Xtest_normalize)/batch_size)
test_acc_sum = 0.0
for i in range(test_total_batch):
test_image_batch = Xtest_normalize[i*batch_size:(i+1)*batch_size]
test_label_batch = Ytest_onehot[i*batch_size:(i+1)*batch_size]
test_batch_acc = sess.run(accuracy, feed_dict = {x:test_image_batch,y:test_label_batch})
test_acc_sum += test_batch_acc
test_acc = float(test_acc_sum/test_total_batch)
print("Test accuracy:{:.6f}".format(test_acc))
6.2 利用模型進行預測
test_pred=sess.run(pred, feed_dict={x: Xtest_normalize[:10]})
prediction_result = sess.run(tf.argmax(test_pred,1))
6.3 可視化預測結果
plot_images_labels_prediction(Xtest,Ytest,prediction_result,0,10)