Tensorflow基礎知識
Tensorflow設計理念
(1)將圖的定義和圖的運行完全分開,因此Tensorflow被認為是一個“符合主義”的庫
(2)Tensorflow中涉及的運算都要放在圖中,而圖的運行只發生在會話(session)中。開啟會話后,就可以用數據去填充節點,進行運算。關閉會話后,就不能繼續計算了。因此會話提供了操作運算和Tensor求值的環境
編程模型
邊
Tensorflow的邊有兩種連接關系:數據依賴和控制依賴。其中實線邊表示數據依賴,代表數據,即張量。張量在數據流圖中從前往后流動一遍就完成了一次前向傳播,而殘差從后向前流動一編就完成了一次反向傳播。
還有一種特殊邊,一般畫為虛線邊,稱為控制依賴,可以用於控制操作的運算,這被用來確保happens-before關系,這類邊上沒有數據流過,但源節點必須在目的節點開始執行前完成執行。
Tensorflow支持的張量數據屬性
數據類型 Python類型
DT_FLOAT tf.float32 32位浮點型
DT_DOUBLE tf.float64 64位浮點型
DT_INT64 tf.int64 64位有符號整型
DT_INT32 tf.int32 32位有符號整型
DT_INT16 tf.int16 16位有符號整型
DT_INT8 tf.int8 8位有符號整型
DT_UINT8 tf.uint8 8位無符號整型
DT_STRING tf.string 可變長度的字節數組
DT_BOOL tf.bool 布爾型
DT_COMPLEX64 tf.complex64 由兩個32位浮點數組成的復數
DT_QINT32 tf.qint32 用於量化操作的32位有符號整型
DT_QINT8 tf.qint8 用於量化操作的8位有符號整型
DT_QUINT8 tf.quint8 用於量化操作的8位無符號整型
節點
圖中的節點又稱為算子,它代表一個操作(op),一般用來表示施加的數學運算,也可以表示數據輸入(feed in)的起點以及輸出(push out)的終點,或者是讀取/寫入持久變量的終點
下面列舉了一些Tensorflow實現的算子
類別 示例
數學運算操作 Add,Sub,Mul,Div,Exp,Log,Greater,Less,Equal
數組運算操作 Concat,Slice,Split,Constant,Rank,Shape,Shuffle
矩陣運算操作 MatMul,MatrixInverse,MatrixDeterminant
有狀態的操作 Variable,Assign,AssignAdd
神經網絡構建操作 SoftMax,Sigmoid,ReLU,Convolution2D,MaxPool
檢查點操作 Save,Restore
隊列和同步操作 Enqueue,Dequeue,MutexAcquire,MutexRelease
控制張量流動的操作 Merge,Switch,Enter,Leave,NextIteration
其他概念
除了邊和節點,Tensorflow還涉及其他一些概念,如圖、會話、設備、變量、內核等。
1.圖
把操作任務描述成有向無環圖,那么如何構建圖呢?構建圖的第一步就是創建各個節點
2.會話
啟動圖的第一步是創建一個Session對象。會話(Session)提供在圖中執行操作的一些方法。一般的模式是,建立會話,此時會產生一張空圖,在會話中添加節點和邊,形成一張圖,然后執行
在調用Session對象的run()方法來執行圖時,傳入一些Tensor,這個過程叫填充(feed);返回的結果類型根據輸入的類型而定,這個過程叫取回(fetch)
會話是圖交互的一個橋梁,一個會話可以有多個圖,會話可以修改圖的結構,也可以往圖中注入數據進行計算。因此,會話主要有兩個API接口:Extend和Run.Extend操作是在Graph中添加節點和邊,Run操作是輸入計算的節點和填充必要的數據后,進行運算,並輸出運算結果。
3.設備
設備(device)是指一塊可以用來運算並且擁有自己的地址空間的硬件,如CPU和GPU.Tensorflow為了實現分布式執行操作,充分利用計算資源,可以明確指定操作在哪個設備上執行。
4.變量
變量是一種特殊的數據,它在圖中有固定的位置,不像普通張量那樣可以流動。例如,創建一個變量張量,使用tf.Variable()構造函數,這個構造函數需要一個初始值
Tensorflow還提供填充機制,可以在構建圖時使用tf.placeholder()臨時替代任意操作的張量,在調用Session對象的run()去執行圖時,使用填充數據作為調用的參數,調用結束后,填充數據就消失。
import tensorflow as tf input1=tf.placeholder(tf.float32) input2=tf.placeholder(tf.float32) output=tf.multiply(input1,input2) with tf.Session() as sess: print(sess.run([output],feed_dict={input1:[7.],input2:[2.]}))
5.內核
我們知道操作是對抽象操作的一個統稱,而內核則是能夠運行在特定設備(如CPU,GPU)上的一種對操作的實現。因此,同一個操作可能會對應多個內核。
當自定義一個操作時,需要把新操作和內核通過注冊的方式添加到系統中。
常用API
圖、操作和張量
Tensorflow的計算表現位數據流圖,所以tf.Graph類中包含一系列表示計算的操作對象(tf.Operation),以及在操作之間流動的數據——張量對象(tf.Tensor)。與圖相關的API均位於tf.Graph類中
tf.Graph._init_() 創建一個空圖
tf.Graph.as_default() 將某圖設置為默認圖,並返回一個上下文管理器。如果不顯式添加一個默認圖,系統會自動設置一個全局的默認圖。所設置的默認圖,在模塊范圍內定義的節點都將默認加入默認圖中
tf.Graph.device(device_name_or_function) 定義運行圖所使用的設備,並返回一個上下文管理器
tf.Graph.name_scope(name) 為節點創建層次化的名稱,並返回一個上下文管理器。
tf.Operation類代表圖中的一個節點,用於計算張量數據。該類型由節點構造器產生。
與操作相關的API均位於tf.Operation類中
tf.Operation.name 操作名稱
tf.Operation.type 操作類型
tf.Operation.inputs 操作的輸入
tf.Operation.outputs 操作的輸出
tf.Operation.control_inputs 操作的依賴
tf.Operation.run(feed_dict=None,session=None) 在會話中運行該操作
tf.Operation.get_attr(name) 獲取操作的屬性值
tf.Tensor類是操作輸出的符號句柄,它不包含操作i輸出的值,而是提供了一種在tf.Session中計算這些值得方法。這樣就可以在操作之間構建一個數據流連接,使Tensorflow能夠執行一個表示大量多步計算的圖形
tf.Tensor.dtype 張量的數據類型
tf.Tensor.name 張量的名稱
tf.Tensor.value_index 張量在操作輸出中的索引
tf.Tensor.graph 張量所在的圖
tf.Tensor.op 產生該張量的操作
tf.Tensor.consumers() 返回使用該張量的操作列表
tf.Tensor.eval(feed_dict,session=None) 在會話中求張量的值,需要使用sess.as_default()或者eval(session=sess)
tf.Tensor.get_shape() 返回用於表示張量的形狀(維度)的類Tensorshape
tf.Tensor.set_shape() 更新張量的形狀
tf.Tensor.device 設置計算該張量的設備
可視化
可視化時,需要在程序中給必要的節點添加摘要(summary),摘要會收集該節點的數據,並標記上第幾步、時間戳等標識,寫入事件文件(event file)中。tf.summary.FileWriter類用於在目錄中創建事件文件,並且向文件中添加摘要和事件,用來在TensorBoard中展示
可視化常用API
tf.summary.FileWriter._init_(logdir,graph=None,max_queue=10,flush_secs=120,graph_def=None) 創建FileWriter和事件文件,會在logdir中創建一個新的事件文件
tf.summary.FileWriter.add_summary(summary,global_step=None) 將摘要添加到事件文件
tf.summary.FileWriter.add_event(event) 向事件文件中添加一個事件
tf.summary.FileWriter.add_graph(graph,global_step=None,graph_def=None) 向事件文件中添加一個圖
tf.summary.FileWriter.get_logdir 獲取事件文件的路徑
tf.summary.FileWriter.flush() 將所有事件都寫入磁盤
tf.summary.FileWriter.close() 將事件寫入磁盤,並關閉文件操作符
tf.summary.scalar(name,tensor,collections=None) 輸出包含單個標量值的摘要
tf.summary.histogram(name,values,collections=None) 輸出包含直方圖的摘要
tf.summary.audio(name,tensor,sample_rate,max_outputs=3,collections=None) 輸出包含音頻的摘要
tf.summary.image(name,tensor,max_outputs=3,collections=None) 輸出包含圖片的摘要
tf.summary.merge(inputs,collections=None,name=None) 合並摘要,包含所有輸入摘要的值
變量作用域
在Tensorflow中有兩個作用域(scope),一個是name_scope,另一個是variable_scope.兩者的區別是,variable_scope主要是給variable_name加前綴的,也可以給op_name加前綴;name_scope是給op_name加前綴。
variable_scope變量作用域機制在Tensorflow中主要由兩部分組成:
v=tf.get_variable(name,shape,dtype,initializer)#通過所給的名字創建或是返回一個變量
tf.variable_scope(<scope_name>)#為變量指定命名空間
當tf.get_variable_scope().reuse==False時,variable_scope作用域只能用來創建新變量
import tensorflow as tf with tf.variable_scope("foo"): v=tf.get_variable("v",[1]) v2=tf.get_variable("v",[1]) assert(v.name=="foo/v:0")
上面這個程序會報錯,因為v這個變量已經被定義過了,但tf.get_variable_scope()。reuse默認為False,所以不能重用
當tf.get_variable_scope().reuse==True時,作用域可以共享變量
import tensorflow as tf with tf.variable_scope("foo") as scope: v=tf.get_variable("v",[1]) with tf.variable_scope("foo",reuse=True): v1=tf.get_variable("v",[1]) assert(v1==v)
1.獲取變量作用域
可以直接通過tf.Variable_scope()來獲取變量作用域
如果在開啟一個變量作用域里使用之前預先定義的一個作用域,則會跳過當前變量的作用域,保持預先存在的作用域不變
import tensorflow as tf with tf.variable_scope("foo") as foo_scope: assert (foo_scope.name=="foo") with tf.variable_scope("bar"): with tf.variable_scope("baz") as other_scope: assert (other_scope.name=="bar/baz") with tf.variable_scope(foo_scope) as foo_scope2: assert(foo_scope2.name=="foo") #保持不變
變量作用域可以默認攜帶一個初始化器,在這個作用域中的子作用域或變量都可以繼承或者重寫父作用域初始化器中的值
import tensorflow as tf with tf.variable_scope("foo",initializer=tf.constant_initializer(0.4)): v=tf.get_variable("v",[1]) assert(v.eval()==0.4) w=tf.get_variable("w",[1],initializer=tf.constant_initializer(0.3)) assert(w.eval()==0.3) with tf.variable_scope("bar"): v=tf.get_variable("v",[1]) assert(v.eval()==0.4) with tf.variable_scope("baz",initializer=tf.constant_initializer(0.2)): v=tf.get_variable("v",[1]) assert(v.eval()==0.2)
上面講的是variable_name,那對於op_name呢?在variable_scope作用域下的操作,也會加上前綴
import tensorflow as tf with tf.variable_scope("foo"): x=1+tf.get_variable("v",[1]) assert x.op.name=="foo/add"
批標准化
批標准化是為了克服神經網絡層數加深導致難以訓練而誕生的。我們知道,深度神經網絡隨着網絡深度加深,訓練起來會越來越困難,收斂速度會很慢,常常會導致梯度彌散。
統計機器學習中有一個ICS理論,這是一個經典假設:源域和目標域的數據分布是一致的。也就是說,訓練數據和測試數據是滿足相同分布的。這是通過訓練數據獲得的模型能夠在測試集獲得好的效果的一個基本保障
covariate shift是指訓練集的樣本數據和目標樣本數據集分布不一致時,訓練得到的模型無法很好地泛化。它是分布不一致假設之下地一個分支問題,也就是指源域和目標域的條件概率是一致的,但是其邊緣概率不同。的確,對於神經網絡的各層輸出,在經過了層內操作之后,各層輸出分布就會與對應的輸入信號分布不同,而且差異會隨着網絡深度增大而加大,但是每一層所指向的樣本標記仍然是不變的。
解決思路一般是根據訓練樣本和目標樣本的比例對訓練樣本做一個矯正。因此,通過引入批標准化來規范化某些層或者所有層的輸入,從而固定每層輸入信號的均值與方差
方法
批標准化一般用在非線性映射(激活函數)之前,對x=Wu+b做規范化,使結果(輸出信號各個維度)的均值為0,方差為1.讓每一層的輸入有一個穩定的分布會有利於網絡的訓練。
優點
批標准化通過規范化讓激活函數分布在線性區間,結果就是加大了梯度,讓模型更加大膽地進行梯度下降,於是有如下優點:
1.加大探索的步長,加快收斂的速度
2.更容易跳出局部最小值
3.破壞原來的數據分布,一定程度上緩解過擬合
因此,在遇到神經網絡收斂速度很慢或者梯度爆炸等無法訓練的情況下,都可以嘗試用批標准化來解決。
示例:
fc_mean,fc_var=tf.nn.moments(Wx_plus_b,axes=[0],) scale=tf.Variable(tf.ones([out_size])) shift=tf.Variable(tf.zeros([out_size])) epsilon=0.001 Wx_plus_b=tf.nn.batch_normalization(Wx_plus_b,fc_mean,fc_var,shift,scale,epsilon) #也就是在做Wx_plus_b=(Wx_plus_b-fc_mean)/tf.sqrt(fc_var+0.001) #Wx_plus_b=Wx_plus_b*scale+shift
神經元函數及優化方法
激活函數
激活函數運行時激活神經網絡中某一部分神經元,將激活系信息向后傳入下一層的神經網絡。神經網絡之所以能解決非線性問題,本質上就是激活函數加入了非線性因素,彌補了線性模型的表達力,把“激活的神經元的特征”通過函數保留並映射到下一層。
因為神經網絡的數學基礎是處處可微,所以選取的激活函數要能保證數據輸入與輸出也是可微的。
激活函數不會更改輸入數據的維度,也就是輸入和輸出的維度是相同的。有如下的激活函數:
tf.nn.relu()
tf.nn.sigmoid()
tf.nn.tanh()
tf.nn.elu()
tf.nn.bias_add()
tf.nn.crelu()
tf.nn.relu6()
tf.nn.softplus()
tf.nn.softsign()
dropout函數。一個神經元將以概率keep_prob決定是否被抑制。如果被抑制,該神經元的輸出為0,如果不被抑制,那么該神經元的輸出值將被放大到原來的1/keep_prob倍
tf.nn.dropout()
卷積函數
卷積函數是構建神經網絡的重要支架,是在一批圖像上掃描二維過濾器。
tf.nn.convolution(inputs,filter,padding,strides=None,dilation_rate=None,name=None,data_format=None)這個函數計算N維卷積和
tf.nn.conv2d(input,filter,strides,padding,use_cudnn_on_gpu=None,data_format=None,name=None)這個函數的作用是對一個四維的輸入數據input和四維的卷積核filter進行操作,然后對輸入數據進行一個二維的卷積操作,最后得到卷積之后的結果。
tf.nn.depthwise_conv2d(input,filter,strides,padding,rate=None,name=None,data_format=None)這個函數輸入張量的數據維度是[batch,in_height,in_width,in_channels],卷積核的維度是[filter_height,filter_width,in_channels,channel_multiplier],在通道in_channels上面的卷積深度是1,depthwise_conv2d函數將不同的卷積核獨立地應用在in_channels的每個通道上(從通道1到channel_multiplier),然后把所有結果進行匯總。最后輸出通道的總數是in_channels*channel_multiplier
tf.nn.separable_conv2d(input,depthwise_filter,pointwise_filter,strides,padding,rate=None,name=None,data_format=None)是利用幾個分離的卷積核去做卷積。在這個API中,將應用一個二維的卷積核,在每個通道上,以深度channel_multiplier進行卷積。
tf.nn.atrous_conv2d(value,filters,rate,padding,name=None)計算Atrous卷積,又稱孔卷積或者擴張卷積
tf.nn.conv2d_transpose(value,filter,output_shape,strides,padding=’SAME’,data_format=’NHWC’,name=None)在解卷積網絡中有時稱為反卷積,但實際上是conv2d的轉置
tf.nn.conv1d(value,filter,stride,padding,use_cudnn_on_gpu=None,data_format=None,name=None)這個函數與二維卷積類似。這個函數是用來計算給定三維的輸入核過濾器的情況下的一維卷積。不同的是,它的輸入是三維,如[batch,in_width,in_channels]。stride是一個正整數,代表卷積核向右移動每一步的長度。
tf.nn.conv3d(input,filter,strides,padding,name=None)和二維卷積類似,這個函數用來計算給定五維的輸入和過濾器的情況下的三維卷積
池化函數
在神經網絡中,池化函數一般跟在卷積函數的下一層
tf.nn.avg_pool()計算池化區域中元素的平均值
tf,nn.max_pool()計算池化區域中元素的最大值
tf.nn.max_pool_with_argmax()計算池化區域中元素的最大值和該最大值所在的位置
tf.nn.avg_pool3d()和tf.max_pool3d()分別是在三維下的平均池化和最大池化
tf.nn.fractional_avg_pool()和tf.nn.fractional_max_pool()分別是三維下的平均池化和最大池化
tf.nn.pool()執行一個n維的池化操作
分類函數
Tensorflow中常見的分類函數主要有:sigmoid_cross_entropy_with_logits、softmax、log_softmax、softmax_cross_entropy_with_logits等
tf.nn.sigmoid_cross_entropy_with_logits這個函數的輸入要格外注意,如果采用此函數作為損失函數,在神經網絡的最后一層不需要進行sigmoid運算。
tf.nn.softmax 計算softmax激活
log_softmax 計算log softmax激活
softmax_cross_entropy_with_logits
優化方法
如何加速神經網絡的訓練?Tensorflow提供了很多優化器
1.BGD法
批梯度下降,這種方法是利用現有參數對訓練集中的每一個輸入生成一個估計輸出yi,然后跟實際輸出yi比較,統計所有誤差,求平均以后得平均誤差,以此作為更新參數得依據
2.SGD法
隨機梯度下降,這種方法是將數據集拆分成一個個批次,隨機抽取一個批次來計算並更新參數。(1)由於抽取不可避免地梯度會有誤差,需要手動調整學習率,但是選擇合適地學習率又比較困難。尤其是在訓練時,我們常常想對常出現地特征更新速度快一些,而對不常出現的特征更新速度慢一些,而SGD在更新參數時對所有參數采用一樣的學習率,因此無法滿足要求。(2)SGD容易收斂到局部最優,並且在某些情況下可能被困在鞍點
3.Momentum法
Momentum是模擬物理學中動量的概念,更新時在一定程度上保留之前的更新方向,利用當前的批次再微調本次的更新參數,因此引入一個新的變量v(速度),作為前幾次梯度的累加。因此,Momentum能夠更新學習率,再下降初期,前后梯度方向一致時,能夠加速學習;在下降的中后期,在局部最小值的附近來回震盪時,能夠抑制震盪,加快收斂。
4.Nesterov Momentum法
標准Momentum法首先計算一個梯度,然后在加速更新梯度的方向進行一個大的跳躍;Nesterov項首先在原來加速的梯度方向進行一個大的跳躍,然后再該位置計算梯度值,然后用這個梯度值修正最終的更新方向。
5.Adagrad法
Adagrad法能夠自適應地為各個參數分配不同的學習率,能夠控制每個維度的梯度方向。這種方法的優點是能夠實現學習率的自動更改:如果本次更新時梯度大,學習率就衰減得快一些;如果這次更新時梯度小,學習率衰減得就慢一些。
6.Adadelta法
Adagrad法仍然存在一些問題,其學習率單調遞減,在訓練的后期學習率非常小,並且需要手動設置一個全局的初始化學習率。Adadelta法用一階的方法,近似模擬二階牛頓法,解決了這些問題
7.RMSprop法
RMSprop法與Momentum法類似,通過引入一個衰減系數,使每一回合都衰減一定比例,在實踐中,對循環神經網絡(RNN)效果很好
8.Adam法
Adam法根據損失函數針對每個參數的梯度的一階矩估計和二階矩估計動態調整每個參數的學習率。
模型的存儲和加載
訓練好一個神經網絡后,我們希望能夠將其應用在預測數據上。那么,如何把模型存儲起來呢?同時,對於一個已經存儲起來的模型,在將其應用在預測數據上時又如何加載呢?
Tensorflow的API提供了以下兩種方式來存儲和加載模型
(1)生成檢查點文件,擴展為一般為.ckpt,通過在tf.train.Saver對象上調用Saver.save()生成。它包含權重和其他在程序中定義的變量,不包含圖結構。如果需要在另一個程序中使用,需要重新創建圖形結構,並告訴Tensorflow如何處理這些權重。
(2)生成圖協議文件,這是一個二進制文件,擴展名一般為.pb,用tf.train.write_graph()保存,只包含圖形結構,不包含權重,然后使用tf.import_graph_def()來加載圖形。
模型的存儲與加載
模型存儲主要是建立一個tf.train.Saver()來保存變量,並且指定保存的位置,一般模型的擴展名為.ckpt。
存儲模型
import os import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data def init_weight(shape): return tf.Variable(tf.random_normal(shape,stdde=0.01)) def model(X,w_h,w_h2,w_o,p_keep_input,p_keep_hidden): X=tf.nn.dropout(X,p_keep_input) h=tf.nn.relu(tf.matmul(X,w_h)) h=tf.nn.dropout(h,p_keep_hidden) h2=tf.nn.relu(tf.matmul(h,w_h2)) h2=tf.nn.dropout(h2,p_keep_hidden) return tf.matmul(h2,w_o) mnist=input_data.read_data_sets("MNIST_data/",one_hot=True) trX,trY,teX,teY=mnist.train.images,mnist.train.labels,mnist.test.images,mnist.test.labels X=tf.placeholder("float",[None,784]) Y=tf.placeholder("float",[None,10]) w_h=init_weight([784,625]) w_h2=init_weight([625,625]) w_o=init_weight([625,10]) p_keep_input=tf.placeholder("float") p_keep_hidden=tf.placeholder("float") py_x=model(X,w_h,w_h2,w_o,p_keep_input,p_keep_hidden) cost=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(py_x,Y)) train_op=tf.train.RMSPropOptimizer(0.001,0.9).minimize(cost) predict_op=tf.argmax(py_x,1) ckpt_dir="./ckpt_dir" if not os.path.exists(ckpt_dir): os.makedirs(ckpt_dir) #定義一個計數器,為訓練輪數計數 global_step=tf.Variable(0,name='global_step',trainable=False) saver=tf.train.Saver() non_storable_variable=tf.Variable(777) with tf.Session() as sess: tf.initialize_all_variables().run() start=global_step.eval() print("Start from:",start) for i in range(start,100): for start,end in zip(range(0,len(trX),128),range(128,len(trX)+1,128)): sess.run(train_op,feed_dict={X:trX[start:end],Y:trY[start:end],p_keep_input:0.8,p_keep_hidden:0.5}) global_step.assign(i).eval() saver.save(sess,ckpt_dir+"/model.ckpt",global_step=global_step)#存儲模型
加載模型
import tensorflow as tf with tf.Session() as sess: tf.initialize_all_variables().run() ckpt=tf.train.get_checkpoint_state(ckpt_dir) if ckpt and ckpt.model_checkpoint_path: print(ckpt.model_checkpoint_path) saver.restore(sess,ckpt.model_checkpoint_path)#加載所有的參數 #從這里開始就可以直接使用模型進行預測,或者接着繼續訓練
圖的存儲與加載
當僅保存圖模型時,才將圖寫入二進制協議文件中
import tensorflow as tf v=tf.Variable(0,name='my_variable') sess=tf.Session() tf.train.write_graph(sess.graph_def,'/temp/tfmodel','train.pbtxt') with tf.Session() as _sess: with gfile.FastGFile("/tmp/tfmodel/train.pbtxt","rb") as f: graph_def=tf.GraphDef() graph_def.ParseFromString(f.read()) _sess.graph.as_default() tf.import_graph_def(graph_def,name='tfgraph')
隊列和線程
和Tensorflow中的其他組件一樣,隊列(queue)本身也是圖中的一個節點,是一種有狀態的節點,其他節點,如入隊節點(enqueue)和出隊節點(dequeue),可以修改它的內容。
隊列
Tensorflow中主要有兩種隊列,即FIFOQueue和RandomShuffleQueue。
1.FIFOQueue
FIFOQueue創建一個先入先出隊列。例如,我們在訓練一些語音、文字樣本時,使用循環神經網絡的網絡結構,希望讀入的訓練樣本是有序的,就要用FIFOQueue
import tensorflow as tf q=tf.FIFOQueue(3,"float") init=q.enqueue_many(([0.1,0.2,0.3],)) x=q.dequeue() y=x+1 q_inc=q.enqueue([y]) with tf.Session() as sess: sess.run(init) quelen=sess.run(q.size()) for i in range(2):#執行2次操作 sess.run(q_inc) quelen=sess.run(q.size()) for i in range(quelen): print(sess.run(q.dequeue()))#輸出隊列的值
2.RandomShuffleQueue
RandomShuffleQueue創建一個隨機隊列,在出隊列時,是以隨機的順序產生元素的。例如,我們在訓練一些圖像樣本時,使用CNN的網絡結構,希望可以無序地讀入訓練樣本,就要用RandomShuffleQueue,每次隨機產生一個訓練樣本。
RandomShuffleQueue在Tensorflow使用異步計算時非常重要。因為Tensorflow的會話是支持多線程的,我們可以在主線程里執行訓練操作,使用RandomShuffleQueue作為訓練輸入,開多個線程來准備訓練樣本,將樣本壓入隊列后,主線程會從隊列中每次取出mini-batch的樣本進行訓練
q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")#隊列最大長度為10,出隊后最小長度為2 sess=tf.Session() for i in range(0,10):#10次入隊 sess.run(q.enqueue(i)) for i in range(0,8):#8次出隊 print(sess.run(q.dequeue()))
我們嘗試修改入隊次數為12次,再運行,發現程序阻斷不動,或者我們嘗試修改出隊此時為10次,即不保留隊列最小長度,發現隊列輸出8次結果后,在終端仍然阻斷了。
阻斷一般發生在:
1.隊列長度等於最小值,執行出隊操作
2.隊列長度等於最大值,執行入隊操作
上面的例子都是在會話的主線程中進行入隊操作。當數據量很大時,入隊操作從硬盤中讀取數據,放入內存中,主線程需要等待入隊操作完成,才能進行訓練操作。會話中可以運行多個線程,我們使用線程管理器QueueRunner創建一系列的新線程進行入隊操作,讓主線程繼續使用數據,即訓練網絡和讀取數據是異步的,主線程在訓練網絡,另一個線程在將數據從硬盤讀入內存。
隊列管理器
我們創建一個含有隊列的圖:
import tensorflow as tf q=tf.FIFOQueue(1000,"float") counter=tf.Variable(0.0)#計數器 increment_op=tf.assign_add(counter,tf.constant(1.0))#給計數器加一 enqueue_op=q.enqueue(counter)#計數器值加入隊列 #創建一個隊列管理器QueueRunner,用這兩個操作向隊列q中添加元素。目前我們只使用一個線程: qr=tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1) #啟動一個會話,從隊列管理器qr中創建線程 with tf.Session() as sess: sess.run(tf.global_variables_initializer()) enqueue_threads=qr.create_threads(sess,start=True)#啟動入隊線程 for i in range(10): print(sess.run(q.dequeue()))
結果
1.0 1.0 2.0 3.0 5.0 6.0 7.0 8.0 9.0 10.0 ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled ERROR:tensorflow:Exception in QueueRunner: Session has been closed. Exception in thread Thread-22: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1245, in _single_tensor_run fetch_list_as_strings, [], status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Run call was cancelled Exception in thread Thread-23: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run target_list_as_strings, status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.
能輸出結果,但最后會異常:
ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled
ERROR:tensorflow:Exception in QueueRunner: Session has been closed.
我們知道,使用with tf.Session的話,會話執行結束會自動關閉,相當於main函數已經結束,
固也就有 Session has been closed.的錯誤。
import tensorflow as tf # 創建一個含有隊列的圖 q = tf.FIFOQueue(1000,"float") # 創建一個長度為1000的隊列 counter = tf.Variable(0.0) # 計數器 increment_op = tf.assign_add(counter,tf.constant(1.0)) # 操作:給計數器加1 enqueque_op = q.enqueue(counter) # 操作:計數器值加入隊列 # 創建一個隊列管理器 QueueRunner,用這兩個操作向隊列 q 中添加元素,啟動一個線程。 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 啟動一個會話,從隊列管理器qr中創建線程 # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) # 主線程 for i in range(10): print(sess.run(q.dequeue()))
使用Session就不會自動關閉,也就沒有了上面例子中的異常了,雖然沒有了異常,但也和我們
設想的會打印順序的1,2,3,4,5…不一樣,而且像第一個例子中還會重復打印1.0,這是為什么呢?
這個本質是+1操作和入隊操作是異步的,也就是說如果加1操作執行了很多次之后,才執行一次入隊的話,就會出現入隊不是按我們預想的順序那樣;反過來,當我執行幾次入隊之后,才執行一次加1操作就會出現一個數重復入隊的情況。
那該怎么解決這個問題呢!下面為幾種解決的方法
# 方法1 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把兩個操作變成列表中的一個元素 # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法2 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) # 原 enqueque_op = q.enqueue(counter) # 把加一操作變成入隊操作的依賴 with tf.control_dependencies([increment_op]): enqueque_op = q.enqueue(counter) # 由於將加1變成了入隊的依賴,所以入隊操作只需要傳入enqueque_op就行了 qr = tf.train.QueueRunner(q,enqueue_ops=[enqueque_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法3 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把兩個操作變成空操作的依賴 with tf.control_dependencies([increment_op,enqueque_op]): void_op = tf.no_op() # 由於將兩個操作變成了空操作的依賴,所以入隊操作只需要傳入void_op就行了 qr = tf.train.QueueRunner(q,enqueue_ops=[void_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法4 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 用tf.group()把兩個操作組合起來 qr = tf.train.QueueRunner(q,enqueue_ops=[tf.group(increment_op,enqueque_op)]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
線程和協調器
QueueRunner 有一個問題就是:入隊線程自顧自地執行,在需要的出隊操作完成之后,程序沒法結束。這樣就要使用 tf.train.Coordinator 來實現線程間的同步,終止其他線程。
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:協調器,協調線程間的關系可以被當做一種信號量,起同步作用 coord = tf.train.Coordinator() # 啟動入隊線程,協調器是線程的參數 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) # 主線程 for i in range(0,10): print(sess.run(q.dequeue())) coord.request_stop() # 通知其他線程關閉 # join操作等待其他線程結束,其他所有的線程關閉后,這個函數才能返回 coord.join(enqueue_threads)
在關閉隊列線程后,再執行出隊操作,就會拋出 tf.errors.OutOfRange 錯誤。這種情況就需要
使用 tf.errors.OutOfRangeError 來捕捉錯誤,終止循環:
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:協調器,協調線程間的關系可以被當做一種信號量,起同步作用 coord = tf.train.Coordinator() # 啟動入隊線程,協調器是線程的參數 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) coord.request_stop() # 通知其他線程關閉 # 主線程 for i in range(0,10): try: print("i : ",i) print(sess.run(q.dequeue())) except tf.errors.OutOfRangeError: print('finish') break # join操作等待其他線程結束,其他所有的線程關閉后,這個函數才能返回 coord.join(enqueue_threads)
i : 0 1.0 i : 1 finish
說明:從打印出來的信息我們可以看出,將請求線程關閉放置在出隊的前面,也就是說我還沒有出隊之前就請求將線程關閉了,但關閉線程需要一定的時間,所以后來在遍歷出隊是還是可以執行的線程關閉后,如果不拋異常的話就像上個例子那樣會報錯,所以這里執行了異常,並打印出了“finish