准備數據
前期已經將數據生成了tfrecord格式,這里主要是研究如果從tfrecord格式文件中讀取數據batch
讀文件名
獲取tf_record格式的文件名列表
1 tf_record_pattern = os.path.join(FLAGS.data_dir,'%s-*' %self.subset) # subset in ['train','validation'] 2 data_files = tf.gfile.Glob(tf_record_pattern)
或者
1 data_files = tf.train.match_filenames_once(tf.record_pattern)
文件名隊列
用tf.train.string_input_prodecer函數來生成一個隊列,放置文件名,之后文件閱讀器會需要它來讀取數據
注意:並不是一次性將所有文件名都放進隊列,它是有容量大小限制的,也就是說隊列大小為capacity
string_tensor: 文件名列表,一般就是類似 train-00000-of-00008這種
num_epochs=None: 就是說文件名列表放進隊列后是循環無限次,如果設定了特定的值,那么整個隊列只能循環num_epochs次,這里epoch就是指遍歷所有文件名一次,也相當於說對所有數據進行迭代一個epoch
shuffle: 如果為真的話,文件名在每個epoch中都會隨機重排
capacity: 隊列的大小
后面三個參數都不重要
Return: A queue with the output strings. A Queue is added to the current Graph's QUEUE_RUNNER collection.
這個QueueRunner的工作線程是獨立於文件閱讀器的線程,因此亂序和將文件名推入到文件名隊列這些過程不會阻塞文件閱讀器運行。
tf.train.slice_input_producer
這里,如果直接准備的是數據的話,不需要文件名隊列,可以直接產生數據隊列
看函數名也可以看出隊列中的數據時一條slice數據(打亂后shuffle=True),然后拿tf.train.batch()直接取batch就可以獲得亂序batch
文件讀取器
根據不同的文件格式,來選擇對應的文件閱讀器,然后將文件名隊列提供給閱讀器的read方法。閱讀器的read方法會輸出一個key來表征輸入的文件和一個字符串標量解碼成張量從而構造成樣本。
1. 從csv文件中讀取數據,需要使用TextLineReader和decode_csv操作
1 filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) 2 3 reader = tf.TextLineReader() 4 key, value = reader.read(filename_queue) 5 6 # Default values, in case of empty columns. Also specifies the type of the 7 # decoded result. 8 record_defaults = [[1], [1], [1], [1], [1]] 9 col1, col2, col3, col4, col5 = tf.decode_csv( 10 value, record_defaults=record_defaults) 11 features = tf.concat(0, [col1, col2, col3, col4]) 12 13 with tf.Session() as sess: 14 # Start populating the filename queue. 15 coord = tf.train.Coordinator() 16 threads = tf.train.start_queue_runners(coord=coord) 17 18 for i in range(1200): 19 # Retrieve a single instance: 20 example, label = sess.run([features, col5]) 21 22 coord.request_stop() 23 coord.join(threads)
每次read
的執行都會從文件中讀取一行內容, decode_csv
操作會解析這一行內容並將其轉為張量列表。如果輸入的參數有缺失,record_default
參數可以根據張量的類型來設置默認值。
在調用run
或者eval
去執行read
之前, 你必須調用tf.train.start_queue_runners
來將文件名填充到隊列。否則read
操作會被阻塞到文件名隊列中有值為止。
2. 從二進制文件中讀取固定長度記錄,可以使用tf.FixedLengthRecordReader的tf.decode_raw操作。decode_raw
操作可以講一個字符串轉換為一個uint8的張量。
舉例來說,the CIFAR-10 dataset的文件格式定義是:每條記錄的長度都是固定的,一個字節的標簽,后面是3072字節的圖像數據。uint8的張量的標准操作就可以從中獲取圖像片並且根據需要進行重組。 例子代碼可以在tensorflow/models/image/cifar10/cifar10_input.py
找到
3.標准tensorflow格式,將數據轉換成TFRecords文件
從TFRecords文件中讀取數據, 可以使用tf.TFRecordReader
的tf.parse_single_example
解析器。這個parse_single_example
操作可以將Example
協議內存塊(protocol buffer)解析為張量
批處理
在數據輸入管線的末端, 我們需要有另一個隊列來執行輸入樣本的訓練,評價和推理。因此我們使用tf.train.shuffle_batch
函數來對隊列中的樣本進行亂序處理
1 def read_my_file_format(filename_queue): 2 reader = tf.SomeReader() 3 key, record_string = reader.read(filename_queue) 4 example, label = tf.some_decoder(record_string) 5 processed_example = some_processing(example) 6 return processed_example, label 7 8 def input_pipeline(filenames, batch_size, num_epochs=None): 9 filename_queue = tf.train.string_input_producer( 10 filenames, num_epochs=num_epochs, shuffle=True) 11 example, label = read_my_file_format(filename_queue) #這里是單個reader在讀文件,batch是當個reader讀單個文件shuffle成batch 12 # min_after_dequeue defines how big a buffer we will randomly sample 13 # from -- bigger means better shuffling but slower start up and more 14 # memory used. 15 # capacity must be larger than min_after_dequeue and the amount larger 16 # determines the maximum we will prefetch. Recommendation: 17 # min_after_dequeue + (num_threads + a small safety margin) * batch_size 18 min_after_dequeue = 10000 19 capacity = min_after_dequeue + 3 * batch_size 20 example_batch, label_batch = tf.train.shuffle_batch( 21 [example, label], batch_size=batch_size, capacity=capacity, 22 min_after_dequeue=min_after_dequeue) 23 return example_batch, label_batch
tf.train.shuffle_batch()
從上面函數定義看出shuffle_batch的輸入是tensors,看代碼發現不是直接從queue中獲取batch,而是以一種自定義的方式讀取tensor后再獲取batch
如果你需要對不同文件中的樣子有更強的亂序和並行處理,可以使用tf.train.shuffle_batch_join
函數. 示例:
1 def read_my_file_format(filename_queue): 2 # Same as above 3 4 def input_pipeline(filenames, batch_size, read_threads, num_epochs=None): 5 filename_queue = tf.train.string_input_producer( 6 filenames, num_epochs=num_epochs, shuffle=True)
7 example_list = [read_my_file_format(filename_queue) 8 for _ in range(read_threads)] #這里是多個reader,batch_join將是多個reader讀多個文件然后構成batch 9 min_after_dequeue = 10000 10 capacity = min_after_dequeue + 3 * batch_size 11 example_batch, label_batch = tf.train.shuffle_batch_join( 12 example_list, batch_size=batch_size, capacity=capacity, 13 min_after_dequeue=min_after_dequeue) 14 return example_batch, label_batch
在這個例子中, 雖然只使用了一個文件名隊列, 但是TensorFlow依然能保證多個文件閱讀器從同一次迭代(epoch)的不同文件中讀取數據,直到這次迭代的所有文件都被開始讀取為止。(通常來說一個線程來對文件名隊列進行填充的效率是足夠的)
另一種替代方案是:使用tf.train.shuffle_batch
函數,設置num_threads
的值大於1。 這種方案可以保證同一時刻只在一個文件中進行讀取操作(但是讀取速度依然優於單線程),而不是之前的同時讀取多個文件。這種方案的優點是:
- 避免了兩個不同的線程從同一個文件中讀取同一個樣本。
- 避免了過多的磁盤搜索操作。
創建線程並使用QueueRunner對象來預取
使用上面的tf.train.string_input_producer和tf.train.shuffle_batch_join等函數添加QueueRunner到你的數據流圖中。在你運行任何訓練步驟之前,需要調用tf.train.start_queue_runners函數,否則數據流圖將一直掛起。tf.train.start_queue_runners這個函數將會啟動輸入管道的線程,填充樣本到隊列中,以便出隊操作可以從隊列中拿到樣本。這種情況下最好配合使用一個tf.train.Coordinator,這樣可以再發生錯誤的情況下正確下正確地關閉這些線程。如果對訓練迭代數做了限制,那么需要使用一個訓練迭代數計數器,並且需要被初始化。
1 # Create the graph, etc. 2 init_op = tf.initialize_all_variables() 3 4 # Create a session for running operations in the Graph. 5 sess = tf.Session() 6 7 # Initialize the variables (like the epoch counter). 8 sess.run(init_op) #沒有初始化會報錯 9 10 # Start input enqueue threads. 11 coord = tf.train.Coordinator() 12 threads = tf.train.start_queue_runners(sess=sess, coord=coord) 13 14 try: 15 while not coord.should_stop(): 16 # Run training steps or whatever 17 sess.run(train_op) 18 19 except tf.errors.OutOfRangeError: 20 print 'Done training -- epoch limit reached' 21 finally: 22 # When done, ask the threads to stop. 23 coord.request_stop() 24 25 # Wait for threads to finish. 26 coord.join(threads) 27 sess.close()
具體的數據輸入流程如下圖所示:
先單個線程構建文件名隊列,然后用reader解析每個文件名中的數據,一個reader負責一個文件名中的數據,reader解析完數據樣本后,tf.train.shuffle_batch構建樣本隊列,對於單個reader就是按順序對每個文件中數據shuffle輸入隊列。多個reader就是同時對不同的文件中樣本數據shuffle輸入隊列。
因為開始就運行了這些入隊操作的線程,所以訓練循環會使得樣本隊列中中的樣本不斷地出隊。
在tf.train
中要創建這些隊列和執行入隊操作,就要添加tf.train.QueueRunner
到一個使用tf.train.add_queue_runner
函數的數據流圖中。每個QueueRunner
負責一個階段,處理那些需要在線程中運行的入隊操作的列表。一旦數據流圖構造成功,tf.train.start_queue_runners
函數就會要求數據流圖中每個QueueRunner
去開始它的線程運行入隊操作。
如果一切順利的話,你現在可以執行你的訓練步驟,同時隊列也會被后台線程來填充。如果您設置了最大訓練迭代數,在某些時候,樣本出隊的操作可能會得到一個tf.OutOfRangeError
的錯誤。這其實是TensorFlow的“文件結束”(EOF) ———— 這就意味着已經達到了最大訓練迭代數,已經沒有更多可用的樣本了。
最后一個因素是Coordinator
。這是負責在收到任何關閉信號的時候,讓所有的線程都知道。最常用的是在發生異常時這種情況就會呈現出來,比如說其中一個線程在運行某些操作時出現錯誤(或一個普通的Python異常)。
Queue
Queue
是TF隊列和緩存機制的實現QueueRunner
是TF中對操作Queue的線程的封裝Coordinator
是TF中用來協調線程運行的工具
1 import tensorflow as tf 2 tf.InteractiveSession() 3 4 q = tf.FIFOQueue(2, "float") 5 init = q.enqueue_many(([0,0],)) 6 7 x = q.dequeue() 8 y = x+1 9 q_inc = q.enqueue([y]) 10 11 init.run() 12 q_inc.run() 13 q_inc.run() 14 q_inc.run() 15 x.eval() # 返回1 16 x.eval() # 返回2 17 x.eval() # 卡住
QueueRunner
Tensorflow的計算主要在使用CPU/GPU和內存,而數據讀取涉及磁盤操作,速度遠低於前者操作。因此通常會使用多個線程讀取數據,然后使用一個線程消費數據。QueueRunner就是來管理這些讀寫隊列的線程的。
QueueRunner需要與Queue一起使用(這名字已經注定了它和Queue脫不開干系),但並不一定必須使用Coordinator。
Coordinator
Coordinator是個用來保存線程組運行狀態的協調器對象,它和TensorFlow的Queue沒有必然關系,是可以單獨和Python線程使用的。
1 import tensorflow as tf 2 import threading, time 3 4 # 子線程函數 5 def loop(coord, id): 6 t = 0 7 while not coord.should_stop(): 8 print(id) 9 time.sleep(1) 10 t += 1 11 # 只有1號線程調用request_stop方法 12 if (t >= 2 and id == 1): 13 coord.request_stop() 14 15 # 主線程 16 coord = tf.train.Coordinator() 17 # 使用Python API創建10個線程 18 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] 19 20 # 啟動所有線程,並等待線程結束 21 for t in threads: t.start() 22 coord.join(threads)
將這個程序運行起來,會發現所有的子線程執行完兩個周期后都會停止,主線程會等待所有子線程都停止后結束,從而使整個程序結束。由此可見,只要有任何一個線程調用了Coordinator的request_stop
方法,所有的線程都可以通過should_stop
方法感知並停止當前線程。
將QueueRunner和Coordinator一起使用,實際上就是封裝了這個判斷操作,從而使任何一個現成出現異常時,能夠正常結束整個程序,同時主線程也可以直接調用request_stop
方法來停止所有子線程的執行。
Queue的使用都是配合了QueueRunner和Coordinator一起使用的
- 第一種,顯式的創建QueueRunner,然后調用它的create_threads方法啟動線程。
- 第二種,使用全局的start_queue_runners方法啟動線程
在這個例子中,tf.train.string_input_produecer
會將一個隱含的QueueRunner添加到全局圖中(類似的操作還有tf.train.shuffle_batch
等)。
由於沒有顯式地返回QueueRunner來用create_threads啟動線程,這里使用了tf.train.start_queue_runners
方法直接啟動tf.GraphKeys.QUEUE_RUNNERS
集合中的所有隊列線程。這兩種方式在效果上是等效的。
參考鏈接
http://honggang.io/2016/08/19/tensorflow-data-reading/ 解釋的相當清楚,還有具體例子說明 必看{❤❤}
https://saicoco.github.io/tf3/ 解釋的相當棒
http://www.jianshu.com/p/d063804fb272 理解tensorflow中的Queue