tensorflow之數據讀取探究(1)


Tensorflow中之前主要用的數據讀取方式主要有

  1. 建立placeholder,然后使用feed_dict將數據feed進placeholder進行使用。使用這種方法十分靈活,可以一下子將所有數據讀入內存,然后分batch進行feed;也可以建立一個Python的generator,一個batch一個batch的將數據讀入,並將其feed進placeholder。這種方法很直觀,用起來也比較方便靈活jian,但是這種方法的效率較低,難以滿足高速計算的需求。
  2. 使用TensorFlow的QueueRunner,通過一系列的Tensor操作,將磁盤上的數據分批次讀入並送入模型進行使用。這種方法效率很高,但因為其牽涉到Tensor操作,不夠直觀,也不方便調試,所有有時候會顯得比較困難。使用這種方法時,常用的一些操作包括tf.TextLineReader,tf.FixedLengthRecordReader以及tf.decode_raw等等。如果需要循環,條件操作,還需要使用TensorFlow的tf.while_loop,tf.case等操作。
  3. Tensorflow API: tf.data.Dataset使用: https://blog.csdn.net/ssmixi/article/details/80572813

               ---參考:https://github.com/titu1994/neural-image-assessment/blob/master/utils/data_loader.py

                             https://github.com/arieszhang1994/NIMA/blob/master/train_tf.py

這里寫圖片描述

## dataloader實現
- (Pytorch)本身Pytorch數據處理比較規范:先定義Datasets類,初始化時生成list,實現__getitem__進行讀取元素,並進行預處理操作,是一個可迭代對象;Dataloader類也是可迭代對象,iter(Dataloader)可實現next()訪問,多線程加速batch data的處理,使用yield來使用有限的內存;內部還有queue操作;
- (通用)讀數據最直接的方法:先准備所有樣本list(image_name,image_label),在一個epoch中,shuffle后;依次按照index = (每個epoch進行迭代的次數len(list)/batch_size)*batch_size + j(當前batch的第j個文件),這樣來讀取一張圖,做預處理,收集滿一個batch的數據;然后在喂進網絡訓練;(reference: https://github.com/GeorgeSeif/Semantic-Segmentation-Suite/blob/master/train.py ;https://github.com/joelthchao/tensorflow-finetune-flickr-style/blob/master/dataset.py),https://github.com/dgurkaynak/tensorflow-cnn-finetune/blob/master/utils/preprocessor.py]; 這樣讀取數據的正本多少呢?
- (Caffe)第二步非常慢:可以參考caffe自定義數據層:https://www.cnblogs.com/ranjiewen/p/9903826.html;該方式進行多進程讀取數據process;
- Tensorflow讀取數據方式:

上面鏈接的方式是通過tf.train.slice_input_producer()直接讀取文件list, tf.train.slice_input_producer()也可以讀取tfrecords文件;

        self.image_list, self.label_list = read_labeled_image_list(self.data_dir, self.data_list)
        self.images = tf.convert_to_tensor(self.image_list, dtype=tf.string)
        self.labels = tf.convert_to_tensor(self.label_list, dtype=tf.string)
        self.queue = tf.train.slice_input_producer([self.images, self.labels],
                                                   shuffle=input_size is not None) # not shuffling if it is val
self.image, self.label = read_images_from_disk(self.queue, self.input_size, random_scale, random_mirror, ignore_label, img_mean) 

       image_batch, label_batch = tf.train.batch([self.image, self.label],
num_elements)

一、tensorflow讀取機制圖解

首先需要思考的一個問題是,什么是數據讀取?以圖像數據為例,讀取數據的過程可以用下圖來表示:

假設我們的硬盤中有一個圖片數據集0001.jpg,0002.jpg,0003.jpg……我們只需要把它們讀取到內存中,然后提供給GPU或是CPU進行計算就可以了。這聽起來很容易,但事實遠沒有那么簡單。事實上,我們必須要把數據先讀入后才能進行計算,假設讀入用時0.1s,計算用時0.9s,那么就意味着每過1s,GPU都會有0.1s無事可做,這就大大降低了運算的效率。

如何解決這個問題?方法就是將讀入數據和計算分別放在兩個線程中,將數據讀入內存的一個隊列,如下圖所示:

讀取線程源源不斷地將文件系統中的圖片讀入到一個內存的隊列中,而負責計算的是另一個線程,計算需要數據時,直接從內存隊列中取就可以了。這樣就可以解決GPU因為IO而空閑的問題!

而在tensorflow中,為了方便管理,在內存隊列前又添加了一層所謂的“文件名隊列”

為什么要添加這一層文件名隊列?我們首先得了解機器學習中的一個概念:epoch。對於一個數據集來講,運行一個epoch就是將這個數據集中的圖片全部計算一遍。如一個數據集中有三張圖片A.jpg、B.jpg、C.jpg,那么跑一個epoch就是指對A、B、C三張圖片都計算了一遍。兩個epoch就是指先對A、B、C各計算一遍,然后再全部計算一遍,也就是說每張圖片都計算了兩遍。

tensorflow使用文件名隊列+內存隊列雙隊列的形式讀入文件,可以很好地管理epoch。下面我們用圖片的形式來說明這個機制的運行方式。如下圖,還是以數據集A.jpg, B.jpg, C.jpg為例,假定我們要跑一個epoch,那么我們就在文件名隊列中把A、B、C各放入一次,並在之后標注隊列結束。

程序運行后,內存隊列首先讀入A(此時A從文件名隊列中出隊):再依次讀入B和C:

此時,如果再嘗試讀入,系統由於檢測到了“結束”,就會自動拋出一個異常(OutOfRange)。外部捕捉到這個異常后就可以結束程序了。這就是tensorflow中讀取數據的基本機制。如果我們要跑2個epoch而不是1個epoch,那只要在文件名隊列中將A、B、C依次放入兩次再標記結束就可以了。

二、tensorflow讀取數據機制的對應函數

如何在tensorflow中創建上述的兩個隊列呢?

對於文件名隊列,我們使用tf.train.string_input_producer函數。這個函數需要傳入一個文件名list,系統會自動將它轉為一個文件名隊列。

此外tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,它就是我們上文中提到的epoch數。另外一個就是shuffle,shuffle是指在一個epoch內文件的順序是否被打亂。若設置shuffle=False,如下圖,每個epoch內,數據還是按照A、B、C的順序進入文件名隊列,這個順序不會改變:

如果設置shuffle=True,那么在一個epoch內,數據的前后順序就會被打亂,如下圖所示:

在tensorflow中,內存隊列不需要我們自己建立,我們只需要使用reader對象從文件名隊列中讀取數據就可以了,具體實現可以參考下面的實戰代碼。

除了tf.train.string_input_producer外,我們還要額外介紹一個函數:tf.train.start_queue_runners。初學者會經常在代碼中看到這個函數,但往往很難理解它的用處,在這里,有了上面的鋪墊后,我們就可以解釋這個函數的作用了。

在我們使用tf.train.string_input_producer創建文件名隊列后,整個系統其實還是處於“停滯狀態”的,也就是說,我們文件名並沒有真正被加入到隊列中(如下圖所示)。此時如果我們開始計算,因為內存隊列中什么也沒有,計算單元就會一直等待,導致整個系統被阻塞。

而使用tf.train.start_queue_runners之后,才會啟動填充隊列的線程,這時系統就不再“停滯”。此后計算單元就可以拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。

 

- 主要區別幾個函數的使用方法:

- tf.train.slice_input_producer(): slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
                         capacity=32, shared_name=None, name=None)

tf.train.slice_input_producer是一個tensor生成器,作用是按照設定,每次從一個tensor列表中按順序或者隨機抽取出一個tensor放入文件名隊列。

第一個參數 tensor_list:包含一系列tensor的列表,表中tensor的第一維度的值必須相等,即個數必須相等,有多少個圖像,就應該有多少個對應的標簽。
第二個參數num_epochs: 可選參數,是一個整數值,代表迭代的次數,如果設置 num_epochs=None,生成器可以無限次遍歷tensor列表,如果設置為 num_epochs=N,生成器只能遍歷tensor列表N次。
第三個參數shuffle: bool類型,設置是否打亂樣本的順序。一般情況下,如果shuffle=True,生成的樣本順序就被打亂了,在批處理的時候不需要再次打亂樣本,使用 tf.train.batch函數就可以了;如果shuffle=False,就需要在批處理時候使用 tf.train.shuffle_batch函數打亂樣本。
第四個參數seed: 可選的整數,是生成隨機數的種子,在第三個參數設置為shuffle=True的情況下才有用。
第五個參數capacity:設置tensor列表的容量。
第六個參數shared_name:可選參數,如果設置一個‘shared_name’,則在不同的上下文環境(Session)中可以通過這個名字共享生成的tensor。
第七個參數name:可選,設置操作的名稱。

- 使用tf.train.string_input_producer()

"""Output strings (e.g. filenames) to a queue for an input pipeline.

  Note: if `num_epochs` is not `None`, this function creates local counter
  `epochs`. Use `local_variables_initializer()` to initialize local variables.

  Args:
    string_tensor: A 1-D string tensor with the strings to produce.
    num_epochs: An integer (optional). If specified, `string_input_producer`
      produces each string from `string_tensor` `num_epochs` times before
      generating an `OutOfRange` error. If not specified,
      `string_input_producer` can cycle through the strings in `string_tensor`
      an unlimited number of times.
    shuffle: Boolean. If true, the strings are randomly shuffled within each
      epoch.
    seed: An integer (optional). Seed used if shuffle == True.
    capacity: An integer. Sets the queue capacity.
    shared_name: (optional). If set, this queue will be shared under the given
      name across multiple sessions. All sessions open to the device which has
      this queue will be able to access it via the shared_name. Using this in
      a distributed setting means each name will only be seen by one of the
      sessions which has access to this operation.
    name: A name for the operations (optional).
    cancel_op: Cancel op for the queue (optional).

  Returns:
    A queue with the output strings.  A `QueueRunner` for the Queue
    is added to the current `Graph`'s `QUEUE_RUNNER` collection.

- tf.train.batch(): tf.train.batch是一個tensor隊列生成器,作用是按照給定的tensor順序,把batch_size個tensor推送到文件隊列,作為訓練一個batch的數據,等待tensor出隊執行計算。

batch(tensors, batch_size, num_threads=1, capacity=32,
          enqueue_many=False, shapes=None, dynamic_pad=False,
          allow_smaller_final_batch=False, shared_name=None, name=None) 
第一個參數tensors:tensor序列或tensor字典,可以是含有單個樣本的序列;
第二個參數batch_size: 生成的batch的大小;
第三個參數num_threads:執行tensor入隊操作的線程數量,可以設置使用多個線程同時並行執行,提高運行效率,但也不是數量越多越好;
第四個參數capacity: 定義生成的tensor序列的最大容量;
第五個參數enqueue_many: 定義第一個傳入參數tensors是多個tensor組成的序列,還是單個tensor;
第六個參數shapes: 可選參數,默認是推測出的傳入的tensor的形狀;
第七個參數dynamic_pad: 定義是否允許輸入的tensors具有不同的形狀,設置為True,會把輸入的具有不同形狀的tensor歸一化到相同的形狀;
第八個參數allow_smaller_final_batch: 設置為True,表示在tensor隊列中剩下的tensor數量不夠一個batch_size的情況下,允許最后一個batch的數量少於batch_size, 設置為False,則不管什么情況下,生成的batch都擁有batch_size個樣本;
第九個參數shared_name: 可選參數,設置生成的tensor序列在不同的Session中的共享名稱;
第十個參數name: 操作的名稱;
# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
 
# 樣本個數
sample_num=5
# 設置迭代次數
epoch_num = 2
# 設置一個批次中包含樣本個數
batch_size = 3
# 計算每一輪epoch中含有的batch個數
batch_total = int(sample_num/batch_size)+1
 
# 生成4個數據和標簽
def generate_data(sample_num=sample_num):
    labels = np.asarray(range(0, sample_num))
    images = np.random.random([sample_num, 224, 224, 3])
    print('image size {},label size :{}'.format(images.shape, labels.shape))
 
    return images,labels
 
def get_batch_data(batch_size=batch_size):
    images, label = generate_data()
    # 數據類型轉換為tf.float32
    images = tf.cast(images, tf.float32)
    label = tf.cast(label, tf.int32)
 
    #從tensor列表中按順序或隨機抽取一個tensor
    input_queue = tf.train.slice_input_producer([images, label], shuffle=False)
 
    image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=1, capacity=64)
    return image_batch, label_batch
 
image_batch, label_batch = get_batch_data(batch_size=batch_size)
 
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess, coord)
    try:
        for i in range(epoch_num):  # 每一輪迭代
            print '************'
            for j in range(batch_total): #每一個batch
                print '--------'
                # 獲取每一個batch中batch_size個樣本和標簽
                image_batch_v, label_batch_v = sess.run([image_batch, label_batch])
                # for k in
                print(image_batch_v.shape, label_batch_v)
    except tf.errors.OutOfRangeError:
        print("done")
    finally:
        coord.request_stop()
    coord.join(threads)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM