tensorflow數據讀取機制
tensorflow中為了充分利用GPU,減少GPU等待數據的空閑時間,使用了兩個線程分別執行數據讀入和數據計算。
具體來說就是使用一個線程源源不斷的將硬盤中的圖片數據讀入到一個內存隊列中,另一個線程負責計算任務,所需數據直接從內存隊列中獲取。
tf在內存隊列之前,還設立了一個文件名隊列,文件名隊列存放的是參與訓練的文件名,要訓練 N個epoch,則文件名隊列中就含有N個批次的所有文件名。 示例圖如下:
圖片來至於 https://zhuanlan.zhihu.com/p/27238630)
在N個epoch的文件名最后是一個結束標志,當tf讀到這個結束標志的時候,會拋出一個 OutofRange 的異常,外部捕獲到這個異常之后就可以結束程序了。而創建tf的文件名隊列就需要使用到 tf.train.slice_input_producer 函數。
tf.train.slice_input_producer
tf.train.slice_input_producer是一個tensor生成器,作用是按照設定,每次從一個tensor列表中按順序或者隨機抽取出一個tensor放入文件名隊列。
slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
capacity=32, shared_name=None, name=None)
- 第一個參數 tensor_list:包含一系列tensor的列表,表中tensor的第一維度的值必須相等,即個數必須相等,有多少個圖像,就應該有多少個對應的標簽。
- 第二個參數num_epochs: 可選參數,是一個整數值,代表迭代的次數,如果設置 num_epochs=None,生成器可以無限次遍歷tensor列表,如果設置為 num_epochs=N,生成器只能遍歷tensor列表N次。
- 第三個參數shuffle: bool類型,設置是否打亂樣本的順序。一般情況下,如果shuffle=True,生成的樣本順序就被打亂了,在批處理的時候不需要再次打亂樣本,使用 tf.train.batch函數就可以了;如果shuffle=False,就需要在批處理時候使用 tf.train.shuffle_batch函數打亂樣本。
- 第四個參數seed: 可選的整數,是生成隨機數的種子,在第三個參數設置為shuffle=True的情況下才有用。
- 第五個參數capacity:設置tensor列表的容量。
- 第六個參數shared_name:可選參數,如果設置一個‘shared_name’,則在不同的上下文環境(Session)中可以通過這個名字共享生成的tensor。
- 第七個參數name:可選,設置操作的名稱。
tf.train.slice_input_producer定義了樣本放入文件名隊列的方式,包括迭代次數,是否亂序等,要真正將文件放入文件名隊列,還需要調用tf.train.start_queue_runners 函數來啟動執行文件名隊列填充的線程,之后計算單元才可以把數據讀出來,否則文件名隊列為空的,計算單元就會處於一直等待狀態,導致系統阻塞。
tf.train.slice_input_producer 和 tf.train.start_queue_runners 使用:
import tensorflow as tf images = ['img1', 'img2', 'img3', 'img4', 'img5'] labels= [1,2,3,4,5] epoch_num=8 f = tf.train.slice_input_producer([images, labels],num_epochs=None,shuffle=False) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) for i in range(epoch_num): k = sess.run(f) print '************************' print (i,k) coord.request_stop() coord.join(threads)
tf.train.slice_input_producer函數中shuffle=False,不對tensor列表亂序,輸出:
************************ (0, ['img1', 1]) ************************ (1, ['img2', 2]) ************************ (2, ['img3', 3]) ************************ (3, ['img4', 4]) ************************ (4, ['img5', 5]) ************************ (5, ['img1', 1]) ************************ (6, ['img2', 2]) ************************ (7, ['img3', 3])
如果設置shuffle=True,輸出亂序:
************************ (0, ['img5', 5]) ************************ (1, ['img4', 4]) ************************ (2, ['img1', 1]) ************************ (3, ['img3', 3]) ************************ (4, ['img2', 2]) ************************ (5, ['img3', 3]) ************************ (6, ['img2', 2]) ************************ (7, ['img1', 1])
tf.train.batch
tf.train.batch是一個tensor隊列生成器,作用是按照給定的tensor順序,把batch_size個tensor推送到文件隊列,作為訓練一個batch的數據,等待tensor出隊執行計算。
batch(tensors, batch_size, num_threads=1, capacity=32, enqueue_many=False, shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, shared_name=None, name=None)
- 第一個參數tensors:tensor序列或tensor字典,可以是含有單個樣本的序列;
- 第二個參數batch_size: 生成的batch的大小;
- 第三個參數num_threads:執行tensor入隊操作的線程數量,可以設置使用多個線程同時並行執行,提高運行效率,但也不是數量越多越好;
- 第四個參數capacity: 定義生成的tensor序列的最大容量;
- 第五個參數enqueue_many: 定義第一個傳入參數tensors是多個tensor組成的序列,還是單個tensor;
- 第六個參數shapes: 可選參數,默認是推測出的傳入的tensor的形狀;
- 第七個參數dynamic_pad: 定義是否允許輸入的tensors具有不同的形狀,設置為True,會把輸入的具有不同形狀的tensor歸一化到相同的形狀;
- 第八個參數allow_smaller_final_batch: 設置為True,表示在tensor隊列中剩下的tensor數量不夠一個batch_size的情況下,允許最后一個batch的數量少於batch_size, 設置為False,則不管什么情況下,生成的batch都擁有batch_size個樣本;
- 第九個參數shared_name: 可選參數,設置生成的tensor序列在不同的Session中的共享名稱;
- 第十個參數name: 操作的名稱;
如果tf.train.batch的第一個參數 tensors 傳入的是tenor列表或者字典,返回的是tensor列表或字典,如果傳入的是只含有一個元素的列表,返回的是單個的tensor,而不是一個列表。
以下舉例: 一共有5個樣本,設置迭代次數是2次,每個batch中含有3個樣本,不打亂樣本順序:
# -*- coding:utf-8 -*- import tensorflow as tf import numpy as np # 樣本個數 sample_num=5 # 設置迭代次數 epoch_num = 2 # 設置一個批次中包含樣本個數 batch_size = 3 # 計算每一輪epoch中含有的batch個數 batch_total = int(sample_num/batch_size)+1 # 生成4個數據和標簽 def generate_data(sample_num=sample_num): labels = np.asarray(range(0, sample_num)) images = np.random.random([sample_num, 224, 224, 3]) print('image size {},label size :{}'.format(images.shape, labels.shape)) return images,labels def get_batch_data(batch_size=batch_size): images, label = generate_data() # 數據類型轉換為tf.float32 images = tf.cast(images, tf.float32) label = tf.cast(label, tf.int32) #從tensor列表中按順序或隨機抽取一個tensor input_queue = tf.train.slice_input_producer([images, label], shuffle=False) image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=1, capacity=64) return image_batch, label_batch image_batch, label_batch = get_batch_data(batch_size=batch_size) with tf.Session() as sess: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess, coord) try: for i in range(epoch_num): # 每一輪迭代 print '************' for j in range(batch_total): #每一個batch print '--------' # 獲取每一個batch中batch_size個樣本和標簽 image_batch_v, label_batch_v = sess.run([image_batch, label_batch]) # for k in print(image_batch_v.shape, label_batch_v) except tf.errors.OutOfRangeError: print("done") finally: coord.request_stop() coord.join(threads)
輸出:
************ -------- ((3, 224, 224, 3), array([0, 1, 2], dtype=int32)) -------- ((3, 224, 224, 3), array([3, 4, 0], dtype=int32)) ************ -------- ((3, 224, 224, 3), array([1, 2, 3], dtype=int32)) -------- ((3, 224, 224, 3), array([4, 0, 1], dtype=int32))
每次生成的batch中含有3個樣本,不打亂次序,所以生成的tensor序列是按照‘0,1,2,3,4,0,1,2,3……’排列的。
如果設置每個batch中含有2個樣本,打亂次序,即設置 batch_size = 2, tf.train.slice_input_producer函數中 shuffle=True,輸出為:
************ -------- ((2, 224, 224, 3), array([3, 0], dtype=int32)) -------- ((2, 224, 224, 3), array([4, 1], dtype=int32)) -------- ((2, 224, 224, 3), array([2, 3], dtype=int32)) ************ -------- ((2, 224, 224, 3), array([1, 0], dtype=int32)) -------- ((2, 224, 224, 3), array([2, 4], dtype=int32)) -------- ((2, 224, 224, 3), array([1, 4], dtype=int32))
與tf.train.batch函數相對的還有一個tf.train.shuffle_batch函數,兩個函數作用一樣,都是生成一定數量的tensor,組成訓練一個batch需要的數據集,區別是tf.train.shuffle_batch會打亂樣本順序。
轉自:https://blog.csdn.net/dcrmg/article/details/79776876