在第十三節,我們已經介紹了使用帶有全局平均池化層的CNN對CIFAR10數據集分類,在學習了反卷積神經網絡之后我們把第十三節那個程序里的卷積層可視化出來。
一 替換掉tf.nn.max_pool()函數
這里不再使用自己定義的max_pool_2x2函數,改成新加入的帶有mask返回值得max_pool_with_argmax()函數:
#定義占位符 使用反卷積函數,占位符中必須指定具體數,不能指定None input_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,24,24,3]) #圖像大小24x24x input_y = tf.placeholder(dtype=tf.float32,shape=[batch_size,10]) #0-9類別 x_image = tf.reshape(input_x,[batch_size,24,24,3]) #不能傳入-1 否則會報錯 #1.卷積層 ->池化層 W_conv1 = weight_variable([5,5,3,64]) b_conv1 = bias_variable([64]) h_conv1 = tf.nn.relu(conv2d(x_image,W_conv1) + b_conv1) #輸出為[-1,24,24,64] print_op_shape(h_conv1) h_pool1,mask1 = max_pool_with_argmax(h_conv1,2) #輸出為[-1,12,12,64] print_op_shape(h_pool1) #2.卷積層 ->池化層 W_conv2 = weight_variable([5,5,64,64]) b_conv2 = bias_variable([64]) h_conv2 = tf.nn.relu(conv2d(h_pool1,W_conv2) + b_conv2) #輸出為[-1,12,12,64] print_op_shape(h_conv2) h_pool2,mask2 = max_pool_with_argmax(h_conv2,2) #輸出為[-1,6,6,64] print_op_shape(h_pool2)
二 反卷積第二層卷積結果
以第二池化輸出的變量h_pool2為開始部分,沿着h_pool2生成的方式反向操作一層一層推導,直至生成原始圖t1_x_image。
如圖所示,上半部分是h_pool2卷積過程,下半部分為反卷積過程,為了分析方便,下半部分名稱與代碼中的變量一致。
#3 反卷積第二層卷積結果 t_conv2 = un_max_pool(h_pool2,mask2,2) print_op_shape(t_conv2) #輸出為[128,12,12,64] t_pool1 = tf.nn.conv2d_transpose(t_conv2 - b_conv2,W_conv2,output_shape=h_pool1.shape,strides=[1,1,1,1],padding='SAME') print_op_shape(t_pool1) #輸出為[128,12,12,64] t_conv1 = un_max_pool(t_pool1,mask1,2) print_op_shape(t_conv1) #輸出為[128,24,24,64] t_x_image = tf.nn.conv2d_transpose(t_conv1 - b_conv1,W_conv1,output_shape=x_image.shape,strides=[1,1,1,1],padding='SAME') #生成原始圖 print_op_shape(t_x_image) #輸出為[128,24,25,3]
因為在卷積過程中,每個卷積后都要加上權重b,所以在反卷積過程中就要將b減去,由於Relu函數基本上恆等變化(除了小於0的部分),所以在反向時不需要可逆操作,可以直接忽略去。
三 反卷積第一層卷積結果
#4 反卷積第一層卷積結果 t1_conv1 = un_max_pool(h_pool1,mask1,2) print_op_shape(t1_conv1) t1_x_image = tf.nn.conv2d_transpose(t1_conv1 - b_conv1,W_conv1,output_shape=x_image.shape,strides=[1,1,1,1],padding='SAME') #生成原始圖 print_op_shape(t1_x_image)
四 合並還原結果,並使用TensorFlow輸出
這次是將結果通過TensorBoard進行展示,所以將生成第一層圖像和第二層圖像與原始圖像和在一起,統一放在tf.summary.image()里,這樣在TensorBoard的image就可以看到圖像了。
#合並還原結果,並輸出給TensorBoard輸出 stictched_decodings = tf.concat((x_image,t1_x_image,t_x_image),axis=2) #圖像數據匯總,並命名為'source/cifar' decoding_summary_op = tf.summary.image('source/cifar',stictched_decodings)
五 session中寫入log
在session中建立一個summary_writer,然后在代碼結尾處通過session.run()運行前面的tf.summary.image操作,使用summary_writer將得出的結果寫入log。
''' 四 開始訓練 ''' sess = tf.Session(); sess.run(tf.global_variables_initializer()) #創建summary_write,用於寫文件 summary_writer = tf.summary.FileWriter('./summary_log',sess.graph) # 啟動計算圖中所有的隊列線程 調用tf.train.start_queue_runners來將文件名填充到隊列,否則read操作會被阻塞到文件名隊列中有值為止。 tf.train.start_queue_runners(sess=sess) for step in range(training_step): #獲取batch_size大小數據集 image_batch,label_batch = sess.run([images_train,labels_train]) #one hot編碼 label_b = np.eye(10,dtype=np.float32)[label_batch] #開始訓練 train.run(feed_dict={input_x:image_batch,input_y:label_b},session=sess) if step % display_step == 0: train_accuracy = accuracy.eval(feed_dict={input_x:image_batch,input_y:label_b},session=sess) print('Step {0} tranining accuracy {1}'.format(step,train_accuracy)) ''' 五 開始測試 ''' image_batch, label_batch = sess.run([images_test, labels_test]) label_b = np.eye(10,dtype=float)[label_batch] print ("finished! test accuracy %g"%accuracy.eval(feed_dict={ input_x:image_batch, input_y: label_b},session=sess)) ''' 六 寫summary日志 ''' #生成summary decoding_summary = sess.run(decoding_summary_op,feed_dict={input_x:image_batch, input_y: label_b}) #將summary寫入文件 summary_writer.add_summary(decoding_summary)
完整代碼:

# -*- coding: utf-8 -*- """ Created on Thu May 3 12:29:16 2018 @author: zy """ ''' 建立一個帶有全局平均池化層的卷積神經網絡 並對CIFAR-10數據集進行分類 然后反卷積,可視化查看 注意這個程序只能運行在GPU機器上 ''' import cifar10_input import tensorflow as tf import numpy as np def weight_variable(shape): ''' 初始化權重 args: shape:權重shape ''' initial = tf.truncated_normal(shape=shape,mean=0.0,stddev=0.01) return tf.Variable(initial) def bias_variable(shape): ''' 初始化偏置 args: shape:偏置shape ''' initial =tf.constant(0.01,shape=shape) return tf.Variable(initial) def conv2d(x,W): ''' 卷積運算 ,使用SAME填充方式 卷積層后 out_height = in_hight / strides_height(向上取整) out_width = in_width / strides_width(向上取整) args: x:輸入圖像 形狀為[batch,in_height,in_width,in_channels] W:權重 形狀為[filter_height,filter_width,in_channels,out_channels] ''' return tf.nn.conv2d(x,W,strides=[1,1,1,1],padding='SAME') def max_pool_2x2(x): ''' 最大池化層,濾波器大小為2x2,'SAME'填充方式 池化層后 out_height = in_hight / strides_height(向上取整) out_width = in_width / strides_width(向上取整) args: x:輸入圖像 形狀為[batch,in_height,in_width,in_channels] ''' return tf.nn.max_pool(x,ksize=[1,2,2,1],strides=[1,2,2,1],padding='SAME') def max_pool_with_argmax(net,stride): ''' 重定義一個最大池化函數,返回最大池化結果以及每個最大值的位置(是個索引,形狀和池化結果一致) 'SAME'填充方式 池化層后 out_height = in_hight / strides_height(向上取整) out_width = in_width / strides_width(向上取整) args: net:輸入數據 形狀為[batch,in_height,in_width,in_channels] stride:步長,是一個int32類型,注意在最大池化操作中我們設置窗口大小和步長大小是一樣的 ''' #使用mask保存每個最大值的位置 這個函數只支持GPU操作 _, mask = tf.nn.max_pool_with_argmax( net,ksize=[1, stride, stride, 1], strides=[1, stride, stride, 1],padding='SAME') #將反向傳播的mask梯度計算停止 mask = tf.stop_gradient(mask) #計算最大池化操作 net = tf.nn.max_pool(net, ksize=[1, stride, stride, 1],strides=[1, stride, stride, 1], padding='SAME') #將池化結果和mask返回 return net,mask def un_max_pool(net,mask,stride): ''' 定義一個反最大池化的函數,找到mask最大的索引,將max的值填到指定位置 args: net:最大池化后的輸出,形狀為[batch, height, width, in_channels] mask:位置索引組數組,形狀和net一樣 stride:步長,是一個int32類型,這里就是max_pool_with_argmax傳入的stride參數 ''' ksize = [1, stride, stride, 1] input_shape = net.get_shape().as_list() # calculation new shape output_shape = (input_shape[0], input_shape[1] * ksize[1], input_shape[2] * ksize[2], input_shape[3]) # calculation indices for batch, height, width and feature maps one_like_mask = tf.ones_like(mask) batch_range = tf.reshape(tf.range(output_shape[0], dtype=tf.int64), shape=[input_shape[0], 1, 1, 1]) b = one_like_mask * batch_range y = mask // (output_shape[2] * output_shape[3]) x = mask % (output_shape[2] * output_shape[3]) // output_shape[3] feature_range = tf.range(output_shape[3], dtype=tf.int64) f = one_like_mask * feature_range # transpose indices & reshape update values to one dimension updates_size = tf.size(net) indices = tf.transpose(tf.reshape(tf.stack([b, y, x, f]), [4, updates_size])) values = tf.reshape(net, [updates_size]) ret = tf.scatter_nd(indices, values, output_shape) return ret def avg_pool_6x6(x): ''' 全局平均池化層,使用一個與原有輸入同樣尺寸的filter進行池化,'SAME'填充方式 池化層后 out_height = in_hight / strides_height(向上取整) out_width = in_width / strides_width(向上取整) args; x:輸入圖像 形狀為[batch,in_height,in_width,in_channels] ''' return tf.nn.avg_pool(x,ksize=[1,6,6,1],strides=[1,6,6,1],padding='SAME') def print_op_shape(t): ''' 輸出一個操作op節點的形狀 args: t:必須是一個tensor類型 t.get_shape()返回一個元組 .as_list()轉換為list ''' print(t.op.name,'',t.get_shape().as_list()) ''' 一 引入數據集 ''' batch_size = 128 learning_rate = 1e-4 training_step = 1500 display_step = 200 #數據集目錄 data_dir = './cifar10_data/cifar-10-batches-bin' print('begin') #獲取訓練集數據 images_train,labels_train = cifar10_input.inputs(eval_data=False,data_dir = data_dir,batch_size=batch_size) images_test, labels_test = cifar10_input.inputs(eval_data = True, data_dir = data_dir, batch_size = batch_size) print('begin data') ''' 二 定義網絡結構 ''' #定義占位符 使用反卷積的時候,這個形狀中不能帶有None,不然會報錯 input_x = tf.placeholder(dtype=tf.float32,shape=[batch_size,24,24,3]) #圖像大小24x24x input_y = tf.placeholder(dtype=tf.float32,shape=[batch_size,10]) #0-9類別 x_image = tf.reshape(input_x,[batch_size,24,24,3]) #不要傳入-1 否則會報錯 #1.卷積層 ->池化層 W_conv1 = weight_variable([5,5,3,64]) b_conv1 = bias_variable([64]) h_conv1 = tf.nn.relu(conv2d(x_image,W_conv1) + b_conv1) #輸出為[128,24,24,64] print_op_shape(h_conv1) h_pool1,mask1 = max_pool_with_argmax(h_conv1,2) #輸出為[128,12,12,64] #h_pool1 = max_pool_2x2(h_conv1) #輸出為[128,12,12,64] print_op_shape(h_pool1) #2.卷積層 ->池化層 W_conv2 = weight_variable([5,5,64,64]) b_conv2 = bias_variable([64]) h_conv2 = tf.nn.relu(conv2d(h_pool1,W_conv2) + b_conv2) #輸出為[128,12,12,64] print_op_shape(h_conv2) h_pool2,mask2 = max_pool_with_argmax(h_conv2,2) #輸出為[128,6,6,64] #h_pool2 = max_pool_2x2(h_conv2) #輸出為[128,6,6,64] print_op_shape(h_pool2) #3 反卷積第二層卷積結果 t_conv2 = un_max_pool(h_pool2,mask2,2) print_op_shape(t_conv2) #輸出為[128,12,12,64] t_pool1 = tf.nn.conv2d_transpose(t_conv2 - b_conv2,W_conv2,output_shape=h_pool1.shape,strides=[1,1,1,1],padding='SAME') print_op_shape(t_pool1) #輸出為[128,12,12,64] t_conv1 = un_max_pool(t_pool1,mask1,2) print_op_shape(t_conv1) #輸出為[128,24,24,64] t_x_image = tf.nn.conv2d_transpose(t_conv1 - b_conv1,W_conv1,output_shape=x_image.shape,strides=[1,1,1,1],padding='SAME') #生成原始圖 print_op_shape(t_x_image) #輸出為[128,24,25,3] #4 反卷積第一層卷積結果 t1_conv1 = un_max_pool(h_pool1,mask1,2) print_op_shape(t1_conv1) t1_x_image = tf.nn.conv2d_transpose(t1_conv1 - b_conv1,W_conv1,output_shape=x_image.shape,strides=[1,1,1,1],padding='SAME') #生成原始圖 print_op_shape(t1_x_image) #合並還原結果,並輸出給TensorBoard輸出 stictched_decodings = tf.concat((x_image,t1_x_image,t_x_image),axis=2) #stictched_decodings = x_image #圖像數據匯總,並命名為'source/cifar' decoding_summary_op = tf.summary.image('source/cifar',stictched_decodings) #5.卷積層 ->全局平均池化層 W_conv3 = weight_variable([5,5,64,10]) b_conv3 = bias_variable([10]) h_conv3 = tf.nn.relu(conv2d(h_pool2,W_conv3) + b_conv3) #輸出為[-1,6,6,10] print_op_shape(h_conv3) nt_hpool3 = avg_pool_6x6(h_conv3) #輸出為[-1,1,1,10] print_op_shape(nt_hpool3) nt_hpool3_flat = tf.reshape(nt_hpool3,[-1,10]) y_conv = tf.nn.softmax(nt_hpool3_flat) ''' 三 定義求解器 ''' #softmax交叉熵代價函數 cost = tf.reduce_mean(-tf.reduce_sum(input_y * tf.log(y_conv),axis=1)) #求解器 train = tf.train.AdamOptimizer(learning_rate).minimize(cost) #返回一個准確度的數據 correct_prediction = tf.equal(tf.arg_max(y_conv,1),tf.arg_max(input_y,1)) #准確率 accuracy = tf.reduce_mean(tf.cast(correct_prediction,dtype=tf.float32)) ''' 四 開始訓練 ''' sess = tf.Session(); sess.run(tf.global_variables_initializer()) #創建summary_write,用於寫文件 summary_writer = tf.summary.FileWriter('./summary_log',sess.graph) # 啟動計算圖中所有的隊列線程 調用tf.train.start_queue_runners來將文件名填充到隊列,否則read操作會被阻塞到文件名隊列中有值為止。 tf.train.start_queue_runners(sess=sess) for step in range(training_step): #獲取batch_size大小數據集 image_batch,label_batch = sess.run([images_train,labels_train]) #one hot編碼 label_b = np.eye(10,dtype=np.float32)[label_batch] #開始訓練 train.run(feed_dict={input_x:image_batch,input_y:label_b},session=sess) if step % display_step == 0: train_accuracy = accuracy.eval(feed_dict={input_x:image_batch,input_y:label_b},session=sess) print('Step {0} tranining accuracy {1}'.format(step,train_accuracy)) ''' 五 開始測試 ''' image_batch, label_batch = sess.run([images_test, labels_test]) label_b = np.eye(10,dtype=float)[label_batch] print ("finished! test accuracy %g"%accuracy.eval(feed_dict={ input_x:image_batch, input_y: label_b},session=sess)) ''' 六 寫summary日志 ''' #生成summary decoding_summary = sess.run(decoding_summary_op,feed_dict={input_x:image_batch, input_y: label_b}) #將summary寫入文件 summary_writer.add_summary(decoding_summary)