一、tensorflow讀取機制圖解
我們必須要把數據先讀入后才能進行計算,假設讀入用時0.1s,計算用時0.9s,那么就意味着每過1s,GPU都會有0.1s無事可做,這就大大降低了運算的效率。
解決這個問題方法就是將讀入數據和計算分別放在兩個線程中,將數據讀入內存的一個隊列,如下圖所示:

讀取線程源源不斷地將文件系統中的圖片讀入到一個內存的隊列中,而負責計算的是另一個線程,計算需要數據時,直接從內存隊列中取就可以了。這樣就可以解決GPU因為IO而空閑的問題!
在tensorflow中,為了方便管理,在內存隊列前又添加了一層所謂的“文件名隊列”。tensorflow使用文件名隊列+內存隊列雙隊列的形式讀入文件,可以很好地管理epoch。下面我們用圖片的形式來說明這個機制的運行方式。


如果再嘗試讀入,系統由於檢測到了“結束”,就會自動拋出一個異常(OutOfRange)。外部捕捉到這個異常后就可以結束程序了。這就是tensorflow中讀取數據的基本機制。
二、tensorflow讀取數據機制的對應函數
對於文件名隊列,我們使用tf.train.string_input_producer函數。這個函數需要傳入一個文件名list,系統會自動將它轉為一個文件名隊列。tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,表示epoch數。另外一個就是shuffle是指在一個epoch內文件的順序是否被打亂。
在tensorflow中,內存隊列不需要我們自己建立,我們只需要使用reader對象從文件名隊列中讀取數據就可以了。
在我們使用tf.train.string_input_producer創建文件名隊列后,整個系統其實還是處於“停滯狀態”的,也就是說,我們文件名並沒有真正被加入到隊列中,此時如果我們開始計算,因為內存隊列中什么也沒有,計算單元就會一直等待,導致整個系統被阻塞。使用tf.train.start_queue_runners之后,才會啟動填充隊列的線程,這時系統就不再“停滯”。此后計算單元就可以拿到數據並進行計算,整個程序也就跑起來了。
reader每次讀取一張圖片並保存。
1 import tensorflow as tf 2 3 # 新建一個Session 4 with tf.Session() as sess: 5 # 我們要讀三幅圖片A.jpg, B.jpg, C.jpg 6 filename = ['A.jpg', 'B.jpg', 'C.jpg'] 7 # string_input_producer會產生一個文件名隊列 8 filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5) 9 # reader從文件名隊列中讀數據。對應的方法是reader.read 10 reader = tf.WholeFileReader() 11 key, value = reader.read(filename_queue) 12 # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化 13 tf.local_variables_initializer().run() 14 # 使用start_queue_runners之后,才會開始填充隊列 15 threads = tf.train.start_queue_runners(sess=sess) 16 i = 0 17 while True: 18 i += 1 19 # 獲取圖片數據並保存 20 image_data = sess.run(value) 21 with open('read/test_%d.jpg' % i, 'wb') as f: 22 f.write(image_data)
三個概念:
Queue是TF隊列和緩存機制的實現QueueRunner是TF中對操作Queue的線程的封裝Coordinator是TF中用來協調線程運行的工具
Queue:
- tf.FIFOQueue 按入列順序出列的隊列
- tf.RandomShuffleQueue 隨機順序出列的隊列
- tf.PaddingFIFOQueue 以固定長度批量出列的隊列
- tf.PriorityQueue 帶優先級出列的隊列
創建函數的參數:
tf.FIFOQueue(capacity, dtypes, shapes=None, names=None ...)
1 import tensorflow as tf 2 tf.InteractiveSession() 3 4 q = tf.FIFOQueue(2, "float") 5 init = q.enqueue_many(([0,0],)) 6 7 x = q.dequeue() 8 y = x+1 9 q_inc = q.enqueue([y]) 10 11 init.run() 12 q_inc.run() 13 q_inc.run() 14 q_inc.run() 15 x.eval() # 返回1 16 x.eval() # 返回2 17 x.eval() # 卡住
QueueRunner
Tensorflow的計算主要在使用CPU/GPU和內存,而數據讀取涉及磁盤操作,速度遠低於前者操作。因此通常會使用多個線程讀取數據,然后
使用一個線程消費數據,QueueRunner就是來管理這些讀寫隊列的線程。
1 import tensorflow as tf 2 import sys 3 q = tf.FIFOQueue(10, "float") 4 counter = tf.Variable(0.0) #計數器 5 # 給計數器加一 6 increment_op = tf.assign_add(counter, 1.0) 7 # 將計數器加入隊列 8 enqueue_op = q.enqueue(counter) 9 10 # 創建QueueRunner 11 # 用多個線程向隊列添加數據 12 # 這里實際創建了4個線程,兩個增加計數,兩個執行入隊 13 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) 14 15 # 主線程 16 sess = tf.InteractiveSession() 17 tf.global_variables_initializer().run() 18 # 啟動入隊線程 19 qr.create_threads(sess, start=True) 20 for i in range(20): 21 print (sess.run(q.dequeue()))
增加計數的進程會不停的后台運行,執行入隊的進程會先執行10次(因為隊列長度只有10),然后主線程開始消費數據,當一部分數據消費被后,入隊的進程又會開始執行。最終主線程消費完20個數據后停止,但其他線程繼續運行,程序不會結束。
Coordinator:
用來保存線程組運行狀態的協調器對象
1 import tensorflow as tf 2 import threading, time 3 4 # 子線程函數 5 def loop(coord, id): 6 t = 0 7 while not coord.should_stop(): 8 print(id) 9 time.sleep(1) 10 t += 1 11 # 只有1號線程調用request_stop方法 12 if (t >= 2 and id == 1): 13 coord.request_stop() 14 15 # 主線程 16 coord = tf.train.Coordinator() 17 # 使用Python API創建10個線程 18 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] 19 20 # 啟動所有線程,並等待線程結束 21 for t in threads: t.start() 22 coord.join(threads)
所有的子線程執行完兩個周期后都會停止,主線程會等待所有子線程都停止后結束,從而使整個程序結束。由此可見,只要有任何一個線程調用了Coordinator的request_stop方法,所有的線程都可以通過should_stop方法感知並停止當前線程。
ALL:
第一種,顯式的創建QueueRunner,然后調用它的create_threads方法啟動線程。例如下面這段代碼:
1 import tensorflow as tf 2 3 # 1000個4維輸入向量,每個數取值為1-10之間的隨機數 4 data = 10 * np.random.randn(1000, 4) + 1 5 # 1000個隨機的目標值,值為0或1 6 target = np.random.randint(0, 2, size=1000) 7 8 # 創建Queue,隊列中每一項包含一個輸入數據和相應的目標值 9 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []]) 10 11 # 批量入列數據(這是一個Operation) 12 enqueue_op = queue.enqueue_many([data, target]) 13 # 出列數據(這是一個Tensor定義) 14 data_sample, label_sample = queue.dequeue() 15 16 # 創建包含4個線程的QueueRunner 17 qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) 18 19 with tf.Session() as sess: 20 # 創建Coordinator 21 coord = tf.train.Coordinator() 22 # 啟動QueueRunner管理的線程 23 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) 24 # 主線程,消費100個數據 25 for step in range(100): 26 if coord.should_stop(): 27 break 28 data_batch, label_batch = sess.run([data_sample, label_sample]) 29 # 主線程計算完成,停止所有采集數據的進程 30 coord.request_stop() 31 coord.join(enqueue_threads)
第二種,使用全局的start_queue_runners方法啟動線程。
在這個例子中,tf.train.string_input_produecer將一個隱含的QueueRunner添加到全局圖中,類似的操作還有tf.train.shuffle_batch等)。由於沒有顯式地返回QueueRunner來用create_threads啟動線程,這里用tf.train.start_queue_runners方法直接啟動tf.GraphKeys.QUEUE_RUNNERS集合中的所有隊列線程。
1 import tensorflow as tf 2 3 # 同時打開多個文件,顯示創建Queue,同時隱含了QueueRunner的創建 4 filename_queue = tf.train.string_input_producer(["data1.csv","data2.csv"]) 5 reader = tf.TextLineReader(skip_header_lines=1) 6 # Tensorflow的Reader對象可以直接接受一個Queue作為輸入 7 key, value = reader.read(filename_queue) 8 9 with tf.Session() as sess: 10 coord = tf.train.Coordinator() 11 # 啟動計算圖中所有的隊列線程 12 threads = tf.train.start_queue_runners(coord=coord) 13 # 主線程,消費100個數據 14 for _ in range(100): 15 features, labels = sess.run([data_batch, label_batch]) 16 # 主線程計算完成,停止所有采集數據的進程 17 coord.request_stop() 18 coord.join(threads)
這兩種方式在效果上是等效的
1 import pandas as pd 2 import numpy as np 3 import tensorflow as tf 4 5 6 def generate_data(): 7 num = 25 8 label = np.asarray(range(0, num)) 9 images = np.random.random([num, 5]) 10 print('label size :{}, image size {}'.format(label.shape, images.shape)) 11 return images,label 12 13 def get_batch_data(): 14 images, label = generate_data() 15 input_queue = tf.train.slice_input_producer([images, label], shuffle=False,num_epochs=2) 16 image_batch, label_batch = tf.train.batch(input_queue, batch_size=5, num_threads=1, capacity=64,allow_smaller_final_batch=False) 17 return image_batch,label_batch 18 19 20 images,label = get_batch_data() 21 sess = tf.Session() 22 sess.run(tf.global_variables_initializer()) 23 sess.run(tf.local_variables_initializer())#這一行必須加,因為slice_input_producer的原因 24 coord = tf.train.Coordinator() 25 threads = tf.train.start_queue_runners(sess,coord) 26 try: 27 while not coord.should_stop(): 28 i,l = sess.run([images,label]) 29 print(i) 30 print(l) 31 except tf.errors.OutOfRangeError: 32 print('Done training') 33 finally: 34 coord.request_stop() 35 coord.join(threads) 36 sess.close()
使用隊列機制不需要 feed_dict,不再浪費內存,並提高GPU的利用率,節省訓練時間
文件准備
|
1
2
3
4
5
6
7
|
$
echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3" > A.csv
$
echo -e "Bee1,B1\nBee2,B2\nBee3,B3" > B.csv
$
echo -e "Sea1,C1\nSea2,C2\nSea3,C3" > C.csv
$ cat A.csv
Alpha1,A1
Alpha2,A2
Alpha3,A3
|
單個Reader,單個樣本
1 import tensorflow as tf 2 # 生成一個先入先出隊列和一個QueueRunner 3 filenames = ['A.csv', 'B.csv', 'C.csv'] 4 filename_queue = tf.train.string_input_producer(filenames, shuffle=False) 5 # 定義Reader 6 reader = tf.TextLineReader() 7 key, value = reader.read(filename_queue) 8 # 定義Decoder 9 example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']]) 10 # 運行Graph 11 with tf.Session() as sess: 12 coord = tf.train.Coordinator() #創建一個協調器,管理線程 13 threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。 14 for i in range(10): 15 print example.eval() #取樣本的時候,一個Reader先從文件名隊列中取出文件名,讀出數據,Decoder解析后進入樣本隊列。 16 coord.request_stop() 17 coord.join(threads) 18 # outpt 19 Alpha1 20 Alpha2 21 Alpha3 22 Bee1 23 Bee2 24 Bee3 25 Sea1 26 Sea2 27 Sea3 28 Alpha1
單個Reader,多個樣本
1 import tensorflow as tf 2 filenames = ['A.csv', 'B.csv', 'C.csv'] 3 filename_queue = tf.train.string_input_producer(filenames, shuffle=False) 4 reader = tf.TextLineReader() 5 key, value = reader.read(filename_queue) 6 example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']]) 7 # 使用tf.train.batch()會多加了一個樣本隊列和一個QueueRunner。Decoder解后數據會進入這個隊列,再批量出隊。 8 # 雖然這里只有一個Reader,但可以設置多線程,相應增加線程數會提高讀取速度,但並不是線程越多越好。 9 example_batch, label_batch = tf.train.batch( 10 [example, label], batch_size=5) 11 with tf.Session() as sess: 12 coord = tf.train.Coordinator() 13 threads = tf.train.start_queue_runners(coord=coord) 14 for i in range(10): 15 print example_batch.eval() 16 coord.request_stop() 17 coord.join(threads) 18 # output 19 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] 20 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1'] 21 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] 22 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2'] 23 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] 24 # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3'] 25 # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2'] 26 # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1'] 27 # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3'] 28 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
多Reader,多個樣本
1 import tensorflow as tf 2 filenames = ['A.csv', 'B.csv', 'C.csv'] 3 filename_queue = tf.train.string_input_producer(filenames, shuffle=False) 4 reader = tf.TextLineReader() 5 key, value = reader.read(filename_queue) 6 record_defaults = [['null'], ['null']] 7 example_list = [tf.decode_csv(value, record_defaults=record_defaults) 8 for _ in range(2)] # Reader設置為2 9 # 使用tf.train.batch_join(),可以使用多個reader,並行讀取數據。每個Reader使用一個線程。 10 example_batch, label_batch = tf.train.batch_join( 11 example_list, batch_size=5) 12 with tf.Session() as sess: 13 coord = tf.train.Coordinator() 14 threads = tf.train.start_queue_runners(coord=coord) 15 for i in range(10): 16 print example_batch.eval() 17 coord.request_stop() 18 coord.join(threads) 19 20 # output 21 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] 22 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1'] 23 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] 24 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2'] 25 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] 26 # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3'] 27 # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2'] 28 # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1'] 29 # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3'] 30 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
與
tf.train.batchtf.train.shuffle_batch函數是單個Reader讀取,但是可以多線程。tf.train.batch_join與tf.train.shuffle_batch_join可設置多Reader讀取,每個Reader使用一個線程。至於兩種方法的效率,單Reader時,2個線程就達到了速度的極限。多Reader時,2個Reader就達到了極限。所以並不是線程越多越快,甚至更多的線程反而會使效率下降。
迭代控制
1 filenames = ['A.csv', 'B.csv', 'C.csv'] 2 filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=3) # num_epoch: 設置迭代數 3 reader = tf.TextLineReader() 4 key, value = reader.read(filename_queue) 5 record_defaults = [['null'], ['null']] 6 example_list = [tf.decode_csv(value, record_defaults=record_defaults) 7 for _ in range(2)] 8 example_batch, label_batch = tf.train.batch_join( 9 example_list, batch_size=5) 10 init_local_op = tf.initialize_local_variables() 11 with tf.Session() as sess: 12 sess.run(init_local_op) # 初始化本地變量 13 coord = tf.train.Coordinator() 14 threads = tf.train.start_queue_runners(coord=coord) 15 try: 16 while not coord.should_stop(): 17 print example_batch.eval() 18 except tf.errors.OutOfRangeError: 19 print('Epochs Complete!') 20 finally: 21 coord.request_stop() 22 coord.join(threads) 23 coord.request_stop() 24 coord.join(threads) 25 # output 26 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] 27 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1'] 28 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] 29 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2'] 30 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] 31 # Epochs Complete!
參考自:在迭代控制中,記得添加tf.initialize_local_variables(),官網教程沒有說明,但是如果不初始化,運行就會報錯。
https://zhuanlan.zhihu.com/p/27238630
http://www.jianshu.com/p/d063804fb272
