6、TensorFlow基礎(四)隊列和線程


隊列和線程

  和 TensorFlow 中的其他組件一樣,隊列(queue)本身也是圖中的一個節點,是一種有狀態的節點,其他節點,如入隊節點(enqueue)和出隊節點(dequeue),可以修改它的內容。例如,入隊節點可以把新元素插到隊列末尾,出隊節點可以把隊列前面的元素刪除。本節主要介紹隊列、隊列管理器、線程和協調器的有關知識。

1、隊列:

  TensorFlow 中主要有兩種隊列,即 FIFOQueue 和 RandomShuffleQueue,它們的源代碼實現在 tensorflow-1.1.0/tensorflow/python/ops/data_flow_ops.py 中。

  (1)、FIFOQueue

   FIFOQueue創建一個先入先出隊列。列如,我們在訓練一些語音、文字樣本時,使用循環神經網絡的網絡結構,希望讀入的訓練樣本是有序的,就要用FIFOQUEUE。

  我們行創建一個含有隊列的圖:

 1 # -*- coding: UTF-8 -*-
 2 # date:2018/6/22
 3 # User:WangHong
 4 import tensorflow as tf
 5 #創建一個先入先出的隊列,初始化隊列插入0.1,0.2,0.3三個數字
 6 q = tf.FIFOQueue(3,'float')
 7 init = q.enqueue_many(([0.1,0.2,0.3],))
 8 #定義出隊、+1、入隊操作
 9 x =q.dequeue()
10 y = x+1
11 q_inc = q.enqueue([y])
12 #然后開啟一個會話,執行2次q_inc操作,隨后查看隊列內容。
13 with tf.Session() as sess:
14     sess.run(init)
15     quelen = sess.run(q.size())
16     for i in range(2):
17         sess.run(q_inc)#執行2次操作,隊列中的值變為0.3,1.1,1.2
18     quelen = sess.run(q.size())
19     for i in range(quelen):
20         print(sess.run(q.dequeue()))#輸出隊列的值

結果:

     

  (2)、RandomShuffleQueue

    RandomShuffleQueue創建一個隨機隊列,在出隊列時,是以隨機的順序產生元素的,例如,我們在訓練一些圖像樣本是,使用CNN的網絡結構,希望。可以無序的讀入訓練樣本,就要用RandomShuffleQueue,每次隨機產生一個訓練樣本。

    RandomShuffleQueue在在TensorFlow使用異步計算時很重要。因為TensorFlow的會話是支持多線程的,我們可以在主線程里執行訓練操作,使用RandomShuffleQueue作為訓練輸入,開多線程來准備訓練樣本,將樣本壓入隊列后,主線性會從線程中每次取出mini-batch的樣本進行訓練。

例子;

 1 # -*- coding: UTF-8 -*-
 2 # date:2018/6/22
 3 # User:WangHong
 4 import tensorflow as tf
 5 q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes = 'float')
 6 #然后開啟一個會話
 7 sess = tf.Session()
 8 for i in range(0,10):#10次入隊
 9     sess.run(q.enqueue(i))
10 for i in range(0,8):#8次出隊
11     print(sess.run(q.dequeue()))

結果:發現結果是亂序

     

我們嘗試修改入隊次數為 12 次,再運行,發現程序阻斷不動,或者我們嘗試修改出隊此
時為 10 次,即不保留隊列最小長度,發現隊列輸出 8 次結果后,在終端仍然阻斷了。

阻斷一般發生在:
● 隊列長度等於最小值,執行出隊操作;
● 隊列長度等於最大值,執行入隊操作。

  上面的例子都是在會話的主線程中進行入隊操作。當數據量很大時,入隊操作從硬盤中讀
取數據,放入內存中,主線程需要等待入隊操作完成,才能進行訓練操作。會話中可以運行多
個線程,我們使用線程管理器 QueueRunner 創建一系列的新線程進行入隊操作,讓主線程繼續
使用數據,即訓練網絡和讀取數據是異步的,主線程在訓練網絡,另一個線程在將數據從硬盤
讀入內存。

 2、隊列管理器

  創建一個含有隊列的圖:

 1 # -*- coding: UTF-8 -*-
 2 # date:2018/6/22
 3 # User:WangHong
 4 import tensorflow as tf
 5 q = tf.FIFOQueue(1000,'float')
 6 counter = tf.Variable(0.0)#計數器
 7 increment_op = tf.assign_add(counter,tf.constant(1.0))#操作給計數器加一
 8 enqueue_op = q.enqueue(counter)#操作;計數器值加入隊列
 9 #創建UI個隊列計數器QueueRunner,用這兩個操作向隊列q添加元素。目前使用一個線程
10 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
11 #啟動一個會話,從隊列管理器qr中創建線程:
12 #主線程
13 with tf.Session() as sess:
14     sess.run(tf.global_variables_initializer())
15     enqueue_threads = qr.create_threads(sess,start=True)#啟動入隊線程
16     for i in range(10):
17         print(sess.run(q.dequeue()))

結果:

     

  

  不是我們期待的自然數列,並且線程被阻斷。這是因為加 1 操作和入隊操作不同步,可能
加 1 操作執行了很多次之后,才會進行一次入隊操作。另外,因為主線程的訓練(出隊操作)
和讀取數據的線程的訓練(入隊操作)是異步的,主線程會一直等待數據送入。

  QueueRunner 有一個問題就是:入隊線程自顧自地執行,在需要的出隊操作完成之后,程
序沒法結束。這樣就要使用 tf.train.Coordinator 來實現線程間的同步,終止其他線程。

 3、線程協調器

 1 # -*- coding: UTF-8 -*-
 2 # date:2018/6/22
 3 # User:WangHong
 4 import tensorflow as tf
 5 q = tf.FIFOQueue(1000,'float')
 6 counter = tf.Variable(0.0)#計數器
 7 increment_op = tf.assign_add(counter,tf.constant(1.0))#操作給計數器加一
 8 enqueue_op = q.enqueue(counter)#操作;計數器值加入隊列
 9 #創建UI個隊列計數器QueueRunner,用這兩個操作向隊列q添加元素。目前使用一個線程
10 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
11 #啟動一個會話,從隊列管理器qr中創建線程:
12 #主線程
13 with tf.Session() as sess:
14     sess.run(tf.global_variables_initializer())
15     coord = tf.train.Coordinator()
16     enqueue_threads = qr.create_threads(sess,start=True)#啟動入隊線程
17     coord.request_stop()  # 通知其他線程關閉
18     for i in range(10):
19         try:
20             print(sess.run(q.dequeue()))
21         except tf.errors.OutOfRangeError:
22             break
23     coord.join(enqueue_threads)#join操作等待其他線程結束,其他所有線程關閉之后,這一函數才能返回

所有隊列管理器被默認加在圖的 tf.GraphKeys.QUEUE_RUNNERS 集合中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM