Tensorflow學習-數據讀取


Tensorflow數據讀取方式主要包括以下三種

  1. Preloaded data:預加載數據
  2. Feeding: 通過Python代碼讀取或者產生數據,然后給后端
  3. Reading from file: 通過TensorFlow隊列機制,從文件中直接讀取數據

前兩種方法比較基礎而且容易理解,在Tensorflow入門教程、書本中經常可以見到,這里不再進行介紹。

在介紹Tensorflow第三種讀取數據方法之前,介紹以下有關隊列相關知識

Queue(隊列)

隊列是用來存放數據的,並且tensorflow中的Queue中已經實現了同步機制,所以我們可以放心的往里面添加數據還有讀取數據。如果Queue中的數據滿了,那么en_queue(隊列添加元素)操作將會阻塞,如果Queue是空的,那么dequeue(隊列拋出元素)操作就會阻塞.在常用環境中,一般是有多個en_queue線程同時像Queue中放數據,有一個dequeue操作從Queue中取數據。

Coordinator(協調管理器)

Coordinator主要是用來幫助管理多個線程,協調多線程之間的配合

 1 # Thread body: loop until the coordinator indicates a stop was requested.
 2 # If some condition becomes true, ask the coordinator to stop.
 3 #將coord傳入到線程中,來幫助它們同時停止工作
 4 def MyLoop(coord):
 5   while not coord.should_stop():
 6     ...do something...
 7     if ...some condition...:
 8       coord.request_stop()
 9 # Main thread: create a coordinator.
10 coord = tf.train.Coordinator()
11 # Create 10 threads that run 'MyLoop()'
12 threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
13 # Start the threads and wait for all of them to stop.
14 for t in threads:
15   t.start()
16 coord.join(threads)

 

QueueRunner()

QueueRunner可以創建多個線程對隊列(queue)進行插入(enqueue)操作,它是一個op,這些線程可以通過上述的Coordinator協調器來協調工作。

在深度學習中樣本數據集有多種存儲編碼形式,以經典數據集Cifar-10為例,公開共下載的數據有三種存儲方式:Bin(二進制)、Python以及Matlab版本。此外,我們常用的還有csv(天池競賽、百度競賽等)比較常見或txt等,當然對圖片存儲最為直觀的還是可視化展示的TIF、PNG、JPG等。Tensorflow官方推薦使用他自己的一種文件格式叫TFRecord,具體實現及應用會在以后詳細介紹。

從上圖中可知,Tensorflow數據讀取過程主要包括兩個隊列(FIFO),一個叫做文件隊列,主要用作對輸入樣本文件的管理(可以想象,所有的訓練數據一般不會存儲在一個文件內,該部分主要完成對數據文件的管理);另一個叫做數據隊列,如果對應的數據是圖像可以認為該隊列中的每一項都是存儲在內存中的解碼后的一系列圖像像素值。

下面,我們分別新建3個csv文件->A.csv;B.csv;C.csv,每個文件下分別用X_i, y_i代表訓練樣本的數據及標注信息。

 1 #-*- coding:gbk -*-
 2 import tensorflow as tf
 3 # 隊列1:生成一個先入先出隊列和一個QueueRunner,生成文件名隊列
 4 filenames = ['A.csv', 'B.csv', 'C.csv']
 5 filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=2)
 6 # 定義Reader
 7 reader = tf.TextLineReader()
 8 key, value = reader.read(filename_queue)
 9 # 定義Decoder 
10 example, label = tf.decode_csv(value, record_defaults=[['string'], ['string']])
11 with tf.Session() as sess:
12     coord = tf.train.Coordinator() #創建一個協調器,管理線程
13     threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。
14     for i in range(12):
15         e_val, l_val = sess.run([example, label])
16         print(e_val, l_val)
17     coord.request_stop()
18     coord.join(threads)

 

程序中,首先根據文件列表,通過tf.train.string_input_producer(filenames, shuffle=False)函數建立了一個對應的文件管理隊列,其中shuffle=False表 示不對文件順序進行打亂(True表示打亂,每次輸出順序將不再一致)。此外,還可通過設置第三個參數num_epochs來控制文件數據多少。
運行結果如下:

上段程序中,主要完成以下幾方面工作:

  1. 針對文件名列表,建立對應的文件隊列
  2. 使用reader讀取對應文件數據集
  3. 解碼數據集,得到樣本example和標注label

感興趣的讀者可以打開tf.train.string_input_producer(...)函數,可以看到如下代碼

 1     """ 
 2  @compatibility(eager)
 3   Input pipelines based on Queues are not supported when eager execution is
 4   enabled. Please use the `tf.data` API to ingest data under eager execution.
 5   @end_compatibility
 6   """
 7   if context.in_eager_mode():
 8     raise RuntimeError(
 9         "Input pipelines based on Queues are not supported when eager execution"
10         " is enabled. Please use tf.data to ingest data into your model"
11         " instead.")
12   with ops.name_scope(name, "input_producer", [input_tensor]):
13     input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
14     element_shape = input_tensor.shape[1:].merge_with(element_shape)
15     if not element_shape.is_fully_defined():
16       raise ValueError("Either `input_tensor` must have a fully defined shape "
17                        "or `element_shape` must be specified")
18     if shuffle:
19       input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
20     input_tensor = limit_epochs(input_tensor, num_epochs)
21     q = data_flow_ops.FIFOQueue(capacity=capacity,
22                                 dtypes=[input_tensor.dtype.base_dtype],
23                                 shapes=[element_shape],
24                                 shared_name=shared_name, name=name)
25     enq = q.enqueue_many([input_tensor])
26     queue_runner.add_queue_runner(
27         queue_runner.QueueRunner(
28             q, [enq], cancel_op=cancel_op))
29     if summary_name is not None:
30       summary.scalar(summary_name,
31                      math_ops.to_float(q.size()) * (1. / capacity))
32     return q

 

可以看到該段代碼主要完成以下工作:

  1. 創建隊列Queue
  2. 創建線程enqueue_many
  3. 添加QueueRunner到collection中
  4. 返回隊列Queue

數據解析

1 # 定義Reader
2 reader = tf.TextLineReader()
3 key, value = reader.read(filename_queue)
4 # 定義Decoder 
5 example, label = tf.decode_csv(value, record_defaults=[['string'], ['string']])

 

這里,我們通過定義一個reader來讀取每個數據文件內容,也可圖中也展示了TensorFlow支持定義多個reader並且讀取文件隊列中文件內容,從而提供數據讀取效率。然后,采用一個decoder_csv函數對讀取的原始CSV文件內容進行解碼,平時我們也可根據自己數據存儲格式選擇不同數據解碼方式。在這里需要指出的是,上述程序中並沒有用到圖中展示的第二個數據隊列,這是為什么呢。

實際上做深度學習or機器學習訓練過程中,為了保證訓練過程的高效性通常不采用單個樣本數據給訓練模型,而是采用一組N個數據(稱作mini-batch),並把每組樣本個數N成為batch-size。現在假設我們每組需要喂給模型N個數據,需通過N次循環讀入內存,然后再通過GPU進行前向or返向傳播運算,這就意味着GPU每次運算都需要一段時間等待CPU讀取數據,從而大大降低了訓練效率。而第二個隊列(數據隊列)就是為了解決這個問題提出來的,代碼實現即為:tf.train.batch()和 tf.train.shuffle_batch,這兩個函數的主要區別在於是否需要將列表中數據進行隨機打亂。

 1 #-*- coding:gbk -*-
 2 import tensorflow as tf
 3 # 生成一個先入先出隊列和一個QueueRunner,生成文件名隊列
 4 filenames = ['A.csv', 'B.csv', 'C.csv']
 5 filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=3)
 6 # 定義Reader
 7 reader = tf.TextLineReader()
 8 key, value = reader.read(filename_queue)
 9 # 定義Decoder 
10 example, label = tf.decode_csv(value, record_defaults=[['string'], ['string']])
11 #example_batch, label_batch = tf.train.shuffle_batch([example,label], batch_size=16, capacity=200, min_after_dequeue=100, num_threads=2)
12 example_batch, label_batch = tf.train.batch([example,label], batch_size=8, capacity=200, num_threads=2)
13 #example_list = [tf.decode_csv(value, record_defaults=[['string'], ['string']])
14 #         for _ in range(2)] # Reader設置為2
15 ### 使用tf.train.batch_join(),可以使用多個reader,並行讀取數據。每個Reader使用一個線程。
16 #example_batch, label_batch = tf.train.batch_join(
17 #   example_list, batch_size=5)
18 init_local_op = tf.initialize_local_variables()
19 with tf.Session() as sess:
20     sess.run(init_local_op)
21     coord = tf.train.Coordinator() #創建一個協調器,管理線程
22     threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。
23     for i in range(5):
24 # Retrieve a single instance:
25         e_val, l_val = sess.run([example_batch, label_batch])
26         print(e_val, l_val)
27     coord.request_stop()
28     coord.join(threads)

 

使用tf.train.batch()函數,每次根據自己定義大小會返回一組訓練數據,從而避免了往內存中循環讀取數據的麻煩,提高了效率。並且還可以通過設置reader個數,實現多線程高效地往數據隊列(或叫內存隊列)中填充數據,直到文件隊列讀完所有文件(或文件數據不足一個batch size)。
tf.train.batch()程序運行結果如下

注:tf.train.batch([example,label], batch_size=8, capacity=200, num_threads=2)參數中,capacity表示隊列大小,每次讀出數據后隊尾會按順序依次補充。num_treads=2表示兩個線程(據說在一個reader下可達到最優),batch_size=8表示每次返回8組訓練數據,即batch size大小。tf.train.shuffle_batch()比tf.train.bathc()多一個min_after_dequeue參數,意思是在每次拋出一個batch后,剩余數據樣本不少於多少個。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM