隊列和線程
和 TensorFlow 中的其他組件一樣,隊列(queue)本身也是圖中的一個節點,是一種有狀態的節點,其他節點,如入隊節點(enqueue)和出隊節點(dequeue),可以修改它的內容。例如,入隊節點可以把新元素插到隊列末尾,出隊節點可以把隊列前面的元素刪除。本節主要介紹隊列、隊列管理器、線程和協調器的有關知識。
1、隊列:
TensorFlow 中主要有兩種隊列,即 FIFOQueue 和 RandomShuffleQueue,它們的源代碼實現在 tensorflow-1.1.0/tensorflow/python/ops/data_flow_ops.py 中。
(1)、FIFOQueue
FIFOQueue創建一個先入先出隊列。列如,我們在訓練一些語音、文字樣本時,使用循環神經網絡的網絡結構,希望讀入的訓練樣本是有序的,就要用FIFOQUEUE。
我們行創建一個含有隊列的圖:
1 # -*- coding: UTF-8 -*- 2 # date:2018/6/22 3 # User:WangHong 4 import tensorflow as tf 5 #創建一個先入先出的隊列,初始化隊列插入0.1,0.2,0.3三個數字 6 q = tf.FIFOQueue(3,'float') 7 init = q.enqueue_many(([0.1,0.2,0.3],)) 8 #定義出隊、+1、入隊操作 9 x =q.dequeue() 10 y = x+1 11 q_inc = q.enqueue([y]) 12 #然后開啟一個會話,執行2次q_inc操作,隨后查看隊列內容。 13 with tf.Session() as sess: 14 sess.run(init) 15 quelen = sess.run(q.size()) 16 for i in range(2): 17 sess.run(q_inc)#執行2次操作,隊列中的值變為0.3,1.1,1.2 18 quelen = sess.run(q.size()) 19 for i in range(quelen): 20 print(sess.run(q.dequeue()))#輸出隊列的值
結果:
(2)、RandomShuffleQueue
RandomShuffleQueue創建一個隨機隊列,在出隊列時,是以隨機的順序產生元素的,例如,我們在訓練一些圖像樣本是,使用CNN的網絡結構,希望。可以無序的讀入訓練樣本,就要用RandomShuffleQueue,每次隨機產生一個訓練樣本。
RandomShuffleQueue在在TensorFlow使用異步計算時很重要。因為TensorFlow的會話是支持多線程的,我們可以在主線程里執行訓練操作,使用RandomShuffleQueue作為訓練輸入,開多線程來准備訓練樣本,將樣本壓入隊列后,主線性會從線程中每次取出mini-batch的樣本進行訓練。
例子;
1 # -*- coding: UTF-8 -*- 2 # date:2018/6/22 3 # User:WangHong 4 import tensorflow as tf 5 q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes = 'float') 6 #然后開啟一個會話 7 sess = tf.Session() 8 for i in range(0,10):#10次入隊 9 sess.run(q.enqueue(i)) 10 for i in range(0,8):#8次出隊 11 print(sess.run(q.dequeue()))
結果:發現結果是亂序
我們嘗試修改入隊次數為 12 次,再運行,發現程序阻斷不動,或者我們嘗試修改出隊此
時為 10 次,即不保留隊列最小長度,發現隊列輸出 8 次結果后,在終端仍然阻斷了。
阻斷一般發生在:
● 隊列長度等於最小值,執行出隊操作;
● 隊列長度等於最大值,執行入隊操作。
上面的例子都是在會話的主線程中進行入隊操作。當數據量很大時,入隊操作從硬盤中讀
取數據,放入內存中,主線程需要等待入隊操作完成,才能進行訓練操作。會話中可以運行多
個線程,我們使用線程管理器 QueueRunner 創建一系列的新線程進行入隊操作,讓主線程繼續
使用數據,即訓練網絡和讀取數據是異步的,主線程在訓練網絡,另一個線程在將數據從硬盤
讀入內存。
2、隊列管理器
創建一個含有隊列的圖:
1 # -*- coding: UTF-8 -*- 2 # date:2018/6/22 3 # User:WangHong 4 import tensorflow as tf 5 q = tf.FIFOQueue(1000,'float') 6 counter = tf.Variable(0.0)#計數器 7 increment_op = tf.assign_add(counter,tf.constant(1.0))#操作給計數器加一 8 enqueue_op = q.enqueue(counter)#操作;計數器值加入隊列 9 #創建UI個隊列計數器QueueRunner,用這兩個操作向隊列q添加元素。目前使用一個線程 10 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1) 11 #啟動一個會話,從隊列管理器qr中創建線程: 12 #主線程 13 with tf.Session() as sess: 14 sess.run(tf.global_variables_initializer()) 15 enqueue_threads = qr.create_threads(sess,start=True)#啟動入隊線程 16 for i in range(10): 17 print(sess.run(q.dequeue()))
結果:
不是我們期待的自然數列,並且線程被阻斷。這是因為加 1 操作和入隊操作不同步,可能
加 1 操作執行了很多次之后,才會進行一次入隊操作。另外,因為主線程的訓練(出隊操作)
和讀取數據的線程的訓練(入隊操作)是異步的,主線程會一直等待數據送入。
QueueRunner 有一個問題就是:入隊線程自顧自地執行,在需要的出隊操作完成之后,程
序沒法結束。這樣就要使用 tf.train.Coordinator 來實現線程間的同步,終止其他線程。
3、線程協調器
1 # -*- coding: UTF-8 -*- 2 # date:2018/6/22 3 # User:WangHong 4 import tensorflow as tf 5 q = tf.FIFOQueue(1000,'float') 6 counter = tf.Variable(0.0)#計數器 7 increment_op = tf.assign_add(counter,tf.constant(1.0))#操作給計數器加一 8 enqueue_op = q.enqueue(counter)#操作;計數器值加入隊列 9 #創建UI個隊列計數器QueueRunner,用這兩個操作向隊列q添加元素。目前使用一個線程 10 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1) 11 #啟動一個會話,從隊列管理器qr中創建線程: 12 #主線程 13 with tf.Session() as sess: 14 sess.run(tf.global_variables_initializer()) 15 coord = tf.train.Coordinator() 16 enqueue_threads = qr.create_threads(sess,start=True)#啟動入隊線程 17 coord.request_stop() # 通知其他線程關閉 18 for i in range(10): 19 try: 20 print(sess.run(q.dequeue())) 21 except tf.errors.OutOfRangeError: 22 break 23 coord.join(enqueue_threads)#join操作等待其他線程結束,其他所有線程關閉之后,這一函數才能返回
所有隊列管理器被默認加在圖的 tf.GraphKeys.QUEUE_RUNNERS 集合中。