在使用TensorFlow進行異步計算時,隊列是一種強大的機制。
為了感受一下隊列,讓我們來看一個簡單的例子。我們先創建一個“先入先出”的隊列(FIFOQueue),並將其內部所有元素初始化為某些值。然后,我們構建一個TensorFlow圖,它從隊列前端取走一個元素,加上1之后,放回隊列的后端。慢慢地,隊列的元素的值就會增加。
TensorFlow提供了兩個類來幫助多線程的實現:tf.Coordinator和 tf.QueueRunner。Coordinator類可以用來同時停止多個工作線程並且向那個在等待所有工作線程終止的程序報告異常,QueueRunner類用來協調多個工作線程同時將多個張量推入同一個隊列中。
Tensorflow隊列
隊列,如FIFOQueue(先進先出隊列,按順序出隊列)和RandomShuffleQueue(隨機出隊列),在TensorFlow的張量異步計算時都非常重要。
FIFOQueue(capacity, dtypes, name='fifo_queue'):創建一個以先進先出的順序對元素進行排隊的隊列
- capacity:整數。可能存儲在此隊列中的元素數量的上限
- dtypes:DType對象列表。長度dtypes必須等於每個隊列元素中的張量數,dtype的類型形狀,決定了后面進隊列元素形狀
- 方法
- dequeue(name=None)
- enqueue(vals, name=None):
- enqueue_many(vals, name=None):vals列表或者元組,返回一個進隊列操作
- size(name=None)
同步執行隊列
完成一個出隊列、+1、入隊列操作(同步操作):
import tensorflow as tf
# 同步操作,如隊列,+1,出隊列
# 創建一個隊列
Q = tf.FIFOQueue(3, dtypes=tf.float32)
# 數據進隊列
init_q = Q.enqueue_many([[1.0, 2.0, 3.0], ])
# 定義操作
de_q = Q.dequeue()
data = de_q + 1
en_q = Q.enqueue(data)
with tf.Session() as sess:
# 初始化隊列
sess.run(init_q)
# 執行10次 +1 操作
for i in range(10):
sess.run(en_q)
# 取出數據
for i in range(Q.size().eval()):
print(Q.dequeue().eval())
輸出:
5.0
6.0
5.0
當數據量很大時,入隊操作從硬盤中讀取數據,放入內存中,主線程需要等待入隊操作完成,才能進行訓練。會話里可以運行多個線程,實現異步讀取。
隊列管理器
QueueRunner類會創建一組線程, 這些線程可以重復的執行Enquene操作, 他們使用同一個Coordinator來處理線程同步終止。此外,一個QueueRunner會運行一個closer thread,當Coordinator收到異常報告時,這個closer thread會自動關閉隊列。
您可以使用一個queue runner,來實現上述結構。 首先建立一個TensorFlow圖表,這個圖表使用隊列來輸入樣本。增加處理樣本並將樣本推入隊列中的操作。增加training操作來移除隊列中的樣本。
tf.train.QueueRunner(queue, enqueue_ops=None):創建一個QueueRunner
- queue:A Queue
- enqueue_ops:添加線程的隊列操作列表,[]*2,指定兩個線程
- 方法
- create_threads(sess, coord=None,start=False):創建線程來運行給定會話的入隊操作
- start:布爾值,如果True啟動線程;如果為False調用者必須調用start()啟動線程
- coord:線程協調器,后面線程管理需要用到
- create_threads(sess, coord=None,start=False):創建線程來運行給定會話的入隊操作
異步執行隊列
通過隊列管理器來實現變量加1,入隊,主線程出隊列的操作(異步操作):
# 異步操作,變量+1,入隊,出隊列
Q = tf.FIFOQueue(100, dtypes=tf.float32)
# 要做的事情
var = tf.Variable(0.0)
data = tf.assign_add(var, 1)
en_q = Q.enqueue(data)
# 隊列管理器op
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 5)
# 變量初始化op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化變量
sess.run(init_op)
# 開始子線程
threads = qr.create_threads(sess, start=True)
# 主線程讀取數據
for i in range(50):
print(sess.run(Q.dequeue()))
分析:這時候有一個問題就是,入隊自顧自的去執行,在需要的出隊操作完成之后,程序沒法結束。需要一個實現線程間的同步,終止其他線程。程序執行完成報如下的錯誤:
tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
[[{{node fifo_queue_enqueue}}]]
線程協調器
tf.train.Coordinator():線程協調員,實現一個簡單的機制來協調一組線程的終止
- should_stop():如果線程應該停止則返回True。
- request_stop(): 請求該線程停止。
- join():等待被指定的線程終止。
首先創建一個Coordinator對象,然后建立一些使用Coordinator對象的線程。這些線程通常一直循環運行,一直到should_stop()返回True時停止。 任何線程都可以決定計算什么時候應該停止。它只需要調用request_stop(),同時其他線程的should_stop()將會返回True,然后都停下來。
加入線程協調器的程序:
# 異步操作,變量+1,入隊,出隊列
Q = tf.FIFOQueue(100, dtypes=tf.float32)
# 要做的事情
var = tf.Variable(0.0)
data = tf.assign_add(var, 1)
en_q = Q.enqueue(data)
# 隊列管理器op
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 5)
# 變量初始化op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化變量
sess.run(init_op)
# 開啟線程協調器
coord = tf.train.Coordinator()
# 開始子線程
threads = qr.create_threads(sess, coord=coord, start=True)
# 主線程讀取數據
for i in range(50):
print(sess.run(Q.dequeue()))
# 請求停止線程
coord.request_stop()
coord.join()