TensorFlow開發實踐經驗(一):優化數據獲取的效率


本文整理了TensorFlow中的數據讀取方法,在TensorFlow中主要有三種方法讀取數據:

1. 供給數據(Feeding):在TensorFlow程序運行的每一步, 讓Python代碼來供給數據。

2. 預加載數據(Preloaded data):在TensorFlow圖中定義常量或變量來保存所有數據(僅適用於數據量比較小的情況)。

3. 從文件讀取數據(Reading from files):在TensorFlow圖的起始, 讓一個輸入管線從文件中讀取數據。

 

對於數據量較小而言,可能一般選擇直接將數據加載進內存,然后再分batch輸入網絡進行訓練(tip:使用這種方法時,結合yield 使用更為簡潔,大家自己嘗試一下吧,我就不贅述了)。但是,如果數據量較大,這樣的方法就不適用了,因為太耗內存,所以這時最好使用tensorflow提供的隊列queue,也就是第3種方法 從文件讀取數據。

 

供給數據(Feeding)

我們一般用tf.placeholder節點來feed數據,該節點不需要初始化也不包含任何數據,我們在執行run()或者eval()指令時通過feed_dict參數把數據傳入graph中來計算。如果在運行過程中沒有對tf.placeholder節點傳入數據,程序會報錯。例如:

import tensorflow as tf
# 設計Graph
x1 = tf.placeholder(tf.int16)
x2 = tf.placeholder(tf.int16)
y = tf.add(x1, x2)
# 用Python產生數據
li1 = [2, 3, 4]
li2 = [4, 0, 1]
# 打開一個session --> 喂數據 --> 計算y
with tf.Session() as sess:
    print sess.run(y, feed_dict={x1: li1, x2: li2})

 

預加載數據(Preloaded data)

預加載數據方法僅限於用在可以完全加載到內存中的小數據集上,主要有兩種方法:

  1. 把數據存在常量(constant)中。
  2. 把數據存在變量(variable)中,我們初始化並且永不改變它的值。

用常量更簡單些,但會占用更多的內存,因為常量存儲在graph數據結構內部。例如:

training_data = ...
training_labels = ...
with tf.Session():
  input_data = tf.constant(training_data)
  input_labels = tf.constant(training_labels)
  ...


如果用變量的話,我們需要在graph構建好之后初始化該變量。例如:

training_data = ...
training_labels = ...
with tf.Session() as sess:
  data_initializer = tf.placeholder(dtype=training_data.dtype,
                                    shape=training_data.shape)
  label_initializer = tf.placeholder(dtype=training_labels.dtype,
                                     shape=training_labels.shape)
  input_data = tf.Variable(data_initializer, trainable=False, collections=[])
  input_labels = tf.Variable(label_initializer, trainable=False, collections=[])
  ...
  sess.run(input_data.initializer,
           feed_dict={data_initializer: training_data})
  sess.run(input_labels.initializer,
           feed_dict={label_initializer: training_labels})


設定trainable=False 可以防止該變量被數據流圖的 GraphKeys.TRAINABLE_VARIABLES 收集, 這樣我們就不會在訓練的時候嘗試更新它的值; 設定 collections=[] 可以防止GraphKeys.VARIABLES 把它收集后做為保存和恢復的中斷點。

無論哪種方式,我們可以用tf.train.slice_input_producer函數每次產生一個切片。這樣就會讓樣本在整個迭代中被打亂,所以在使用批處理的時候不需要再次打亂樣本。所以我們不使用shuffle_batch函數,取而代之的是純tf.train.batch 函數。 如果要使用多個線程進行預處理,需要將num_threads參數設置為大於1的數字。

 

從文件讀取數據(Reading from files)

從文件中讀取數據一般包含以下步驟:

  1. 文件名列表
  2. 文件名隨機排序(可選的)
  3. 迭代控制(可選的)
  4. 文件名隊列
  5. 針對輸入文件格式的閱讀器
  6. 記錄解析器
  7. 預處理器(可選的)
  8. 樣本隊列

 

TFRecords文件

TFRecords其實是一種二進制文件,雖然它不如其他格式好理解,但是它能更好的利用內存,更方便復制和移動,並且不需要單獨的標簽文件。總而言之,這樣的文件格式好處多多。

TFRecords文件包含了tf.train.Example 協議內存塊(protocol buffer)(協議內存塊包含了字段 Features)。我們可以寫一段代碼獲取你的數據, 將數據填入到Example協議內存塊(protocol buffer),將協議內存塊序列化為一個字符串, 並且通過tf.python_io.TFRecordWriter 寫入到TFRecords文件。

從TFRecords文件中讀取數據, 可以使用tf.TFRecordReadertf.parse_single_example解析器。這個操作可以將Example協議內存塊(protocol buffer)解析為張量。

生成TFRecords文件

我們使用tf.train.Example來定義我們要填入的數據格式,然后使用tf.python_io.TFRecordWriter來寫入。

import os
import tensorflow as tf from PIL import Image cwd = os.getcwd() ''' 此處我加載的數據目錄如下: 0 -- img1.jpg img2.jpg img3.jpg ... 1 -- img1.jpg img2.jpg ... 2 -- ... 這里的0, 1, 2...就是類別,也就是下文中的classes classes是我根據自己數據類型定義的一個列表,大家可以根據自己的數據情況靈活運用 ... ''' writer = tf.python_io.TFRecordWriter("train.tfrecords") for index, name in enumerate(classes): class_path = cwd + name + "/" for img_name in os.listdir(class_path): img_path = class_path + img_name img = Image.open(img_path) img = img.resize((224, 224)) img_raw = img.tobytes() #將圖片轉化為原生bytes example = tf.train.Example(features=tf.train.Features(feature={ "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index])), 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])) })) writer.write(example.SerializeToString()) #序列化為字符串 writer.close()

關於Example Feature的相關定義和詳細內容,我推薦去官網查看相關API。

基本的,一個Example中包含FeaturesFeatures里包含Feature(這里沒s)的字典。最后,Feature里包含有一個 FloatList, 或者ByteList,或者Int64List

就這樣,我們把相關的信息都存到了一個文件中,所以前面才說不用單獨的label文件。而且讀取也很方便。

接下來是一個簡單的讀取小例子:

for serialized_example in tf.python_io.tf_record_iterator("train.tfrecords"):
    example = tf.train.Example() example.ParseFromString(serialized_example) image = example.features.feature['image'].bytes_list.value label = example.features.feature['label'].int64_list.value # 可以做一些預處理之類的 print image, label

 

使用隊列讀取

一旦生成了TFRecords文件,為了高效地讀取數據,TF中使用隊列(queue)讀取數據。

def read_and_decode(filename):
    #根據文件名生成一個隊列
    filename_queue = tf.train.string_input_producer([filename])

    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)   #返回文件名和文件
    features = tf.parse_single_example(serialized_example,
                                       features={
                                           'label': tf.FixedLenFeature([], tf.int64),
                                           'img_raw' : tf.FixedLenFeature([], tf.string),
                                       })

    img = tf.decode_raw(features['img_raw'], tf.uint8)
    img = tf.reshape(img, [224, 224, 3])
    img = tf.cast(img, tf.float32) * (1. / 255) - 0.5
    label = tf.cast(features['label'], tf.int32)

    return img, label

之后我們可以在訓練的時候這樣使用

img, label = read_and_decode("train.tfrecords")

#使用shuffle_batch可以隨機打亂輸入
img_batch, label_batch = tf.train.shuffle_batch([img, label],
                                                batch_size=30, capacity=2000,
                                                min_after_dequeue=1000)
init = tf.initialize_all_variables()

with tf.Session() as sess:
    sess.run(init)
    threads = tf.train.start_queue_runners(sess=sess)
    for i in range(3):
        val, l= sess.run([img_batch, label_batch])
        #我們也可以根據需要對val, l進行處理
        #l = to_categorical(l, 12) 
        print(val.shape, l)

幾個注意事項:

第一,tensorflow里的graph能夠記住狀態(state),這使得TFRecordReader能夠記住tfrecord的位置,並且始終能返回下一個。而這就要求我們在使用之前,必須初始化整個graph,這里我們使用了函數tf.initialize_all_variables()來進行初始化。

第二,tensorflow中的隊列和普通的隊列差不多,不過它里面的operationtensor都是符號型的(symbolic),在調用sess.run()時才執行。

第三, TFRecordReader會一直彈出隊列中文件的名字,直到隊列為空。

第四,在我們使用tf.train.string_input_producer創建文件名隊列后,整個系統其實還是處於“停滯狀態”的,也就是說,我們文件名並沒有真正被加入到隊列中。此時如果我們開始計算,因為內存隊列中什么也沒有,計算單元就會一直等待,導致整個系統被阻塞。而使用tf.train.start_queue_runners之后,才會啟動填充隊列的線程,這時系統就不再“停滯”。此后計算單元就可以拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。

 

文件名、隨機排序和迭代控制

我們首先要有個文件名列表,為了產生文件名列表,我們可以手動用Python輸入字符串,例如:

  • ["file0", "file1"]
  • [("file%d" % i) for i in range(2)]
  • [("file%d" % i) for i in range(2)]

我們也可以用tf.train.match_filenames_once函數來生成文件名列表。

有了文件名列表后,我們需要把它送入tf.train.string_input_producer函數中生成一個先入先出的文件名隊列,文件閱讀器需要從該隊列中讀取文件名。

string_input_producer(
    string_tensor,
    num_epochs=None,
    shuffle=True,
    seed=None,
    capacity=32,
    shared_name=None,
    name=None,
    cancel_op=None
)

這個QueueRunner的工作線程獨立於文件閱讀器的線程, 因此隨機排序和將文件名送入到文件名隊列這些過程不會阻礙文件閱讀器的運行。一個QueueRunner每次會把每批次的所有文件名送入隊列中,可以通過設置string_input_producer函數的shuffle參數來對文件名隨機排序,或者通過設置num_epochs來決定對string_tensor里的文件使用多少次,類型為整型,如果想要迭代控制則需要設置了num_epochs參數,同時需要添加tf.local_variables_initializer()進行初始化,如果不初始化會報錯。

 

文件格式

根據不同的文件格式, 應該選擇對應的文件閱讀器, 然后將文件名隊列提供給閱讀器的read方法。閱讀器每次從隊列中讀取一個文件,它的read方法會輸出一個key來表征讀入的文件和其中的紀錄(對於調試非常有用),同時得到一個字符串標量, 這個字符串標量可以被一個或多個解析器,或者轉換操作將其解碼為張量並且構造成為樣本。

根據不同的文件類型,有三種不同的文件閱讀器:

  • tf.TextLineReader
  • tf.FixedLengthRecordReader
  • tf.TFRecordReader

它們分別用於單行讀取(如CSV文件)、固定長度讀取(如CIFAR-10的.bin二進制文件)、TensorFlow標准格式讀取。

根據不同的文件閱讀器,有三種不同的解析器,它們分別對應上面三種閱讀器:

  • tf.decode_csv
  • tf.decode_raw
  • tf.parse_single_exampletf.parse_example

 

CSV文件

當我們讀入CSV格式的文件時,我們可以使用tf.TextLineReader閱讀器和tf.decode_csv解析器。例如:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) 
# 創建一個Filename Queue
# 該例csv文件中共有5列數據,前四列為features,最后一列為label
reader = tf.TextLineReader() # 文件閱讀器
key, value = reader.read(filename_queue) # 每次執行閱讀器都從文件讀一行內容
# Default values, in case of empty columns. Also specifies the type of the decoded result.
record_defaults = [[1], [1], [1], [1], [1]] # 文件數據皆為整數
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])
with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator() #創建一個協調器,管理線程
  threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。
  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])
  coord.request_stop()
  coord.join(threads)

 

record_defaults = [[1], [1], [1], [1], [1]]代表了解析的摸版,默認用,隔開,是用於指定矩陣格式以及數據類型的,CSV文件中的矩陣是NXM的,則此處為1XM,例如上例中M=5。[1]表示解析為整型,如果矩陣中有小數,則應為float型,[1]應該變為[1.0],[‘null’]解析為string類型。每次read的執行都會從文件中讀取一行內容, decode_csv 操作會解析這一行內容並將其轉為張量列表。在調用run或者eval去執行read之前, 必須先調用tf.train.start_queue_runners來將文件名填充到隊列。否則read操作會被阻塞到文件名隊列中有值為止。

col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults), 矩陣中有幾列,這里就要寫幾個參數,比如5列,就要寫到col5,不管你到底用多少。否則報錯。

 

固定長度記錄

我們也可以從二進制文件(.bin)中讀取固定長度的數據,使用的是tf.FixedLengthRecordReader閱讀器和tf.decode_raw解析器。decode_raw節點會把string轉化為uint8類型的張量。

例如CIFAR-10數據集就采用的固定長度的數據,1字節的標簽,后面跟着3072字節的圖像數據。使用uint8類型張量的標准操作可以把每個圖像的片段截取下來並且按照需要重組。下面有一個例子:

reader = tf.FixedLengthRecordReader(record_bytes = record_bytes)
key, value = reader.read(filename_queue)
record_bytes = tf.decode_raw(value, tf.uint8)
label = tf.cast(tf.slice(record_bytes, [0], [label_bytes]), tf.int32)
image_raw = tf.slice(record_bytes, [label_bytes], [image_bytes])
image_raw = tf.reshape(image_raw, [depth, height, width])
image = tf.transpose(image_raw, (1,2,0)) # 圖像形狀為[height, width, channels]     
image = tf.cast(image, tf.float32)


這里介紹上述代碼中出現的函數:tf.slice()

slice(
    input_,
    begin,
    size,
    name=None
)


從一個張量input中提取出長度為size的一部分,提取的起點由begin定義。size是一個向量,它代表着在每個維度提取出的tensor的大小。begin表示提取的位置,它表示的是input的起點偏離值,也就是從每個維度第幾個值開始提取。

begin從0開始,size從1開始,如果size[i]的值為-1,則第i個維度從begin處到余下的所有值都被提取出來。

例如:

# 'input' is [[[1, 1, 1], [2, 2, 2]],
#             [[3, 3, 3], [4, 4, 4]],
#             [[5, 5, 5], [6, 6, 6]]]
tf.slice(input, [1, 0, 0], [1, 1, 3]) ==> [[[3, 3, 3]]]
tf.slice(input, [1, 0, 0], [1, 2, 3]) ==> [[[3, 3, 3],
                                            [4, 4, 4]]]
tf.slice(input, [1, 0, 0], [2, 1, 3]) ==> [[[3, 3, 3]],
                                           [[5, 5, 5]]]

 

標准TensorFlow格式 

我們也可以把任意的數據轉換為TensorFlow所支持的格式, 這種方法使TensorFlow的數據集更容易與網絡應用架構相匹配。這種方法就是使用TFRecords文件,TFRecords文件包含了tf.train.Exampleprotocol buffer(里面包含了名為 Features的字段)。你可以寫一段代碼獲取你的數據, 將數據填入到Exampleprotocol buffer,將protocol buffer序列化為一個字符串, 並且通過tf.python_io.TFRecordWriter類寫入到TFRecords文件。

從TFRecords文件中讀取數據, 可以使用tf.TFRecordReader閱讀器以及tf.parse_single_example解析器。parse_single_example操作可以將Exampleprotocol buffer解析為張量。 具體可以參考如下例子,把MNIST數據集轉化為TFRecords格式:

SparseTensors這種稀疏輸入數據類型使用隊列來處理不是太好。如果要使用SparseTensors你就必須在批處理之后使用tf.parse_example去解析字符串記錄 (而不是在批處理之前使用tf.parse_single_example) 。

 

預處理

我們可以對輸入的樣本數據進行任意的預處理, 這些預處理不依賴於訓練參數, 比如數據歸一化, 提取隨機數據片,增加噪聲或失真等等。具體可以參考如下對CIFAR-10處理的例子:

 

批處理

經過了之前的步驟,在數據讀取流程的最后, 我們需要有另一個隊列來批量執行輸入樣本的訓練,評估或者推斷。根據要不要打亂順序,我們常用的有兩個函數:

  • tf.train.batch()
  • tf.train.shuffle_batch()

下面來分別介紹:

tf.train.batch()

tf.train.batch(
    tensors,
    batch_size,
    num_threads=1,
    capacity=32,
    enqueue_many=False,
    shapes=None,
    dynamic_pad=False,
    allow_smaller_final_batch=False,
    shared_name=None,
    name=None
)

該函數將會使用一個隊列,函數讀取一定數量的tensors送入隊列,然后每次從中選取batch_sizetensors組成一個新的tensors返回出來。

capacity參數決定了隊列的長度。

num_threads決定了有多少個線程進行入隊操作,如果設置的超過一個線程,它們將從不同文件不同位置同時讀取,可以更加充分的混合訓練樣本。

如果enqueue_many參數為False,則輸入參數tensors為一個形狀為[x, y, z]的張量,輸出為一個形狀為[batch_size, x, y, z]的張量。如果enqueue_many參數為True,則輸入參數tensors為一個形狀為[*, x, y, z]的張量,其中所有*的數值相同,輸出為一個形狀為[batch_size, x, y, z]的張量。

allow_smaller_final_batchTrue時,如果隊列中的張量數量不足batch_size,將會返回小於batch_size長度的張量,如果為False,剩下的張量會被丟棄。

tf.train.shuffle_batch()

tf.train.shuffle_batch(
    tensors,
    batch_size,
    capacity,
    min_after_dequeue,
    num_threads=1,
    seed=None,
    enqueue_many=False,
    shapes=None,
    allow_smaller_final_batch=False,
    shared_name=None,
    name=None
)

該函數類似於上面的tf.train.batch(),同樣創建一個隊列,主要區別是會首先把隊列中的張量進行亂序處理,然后再選取其中的batch_size個張量組成一個新的張量返回。但是新增加了幾個參數。

capacity參數依然為隊列的長度,建議capacity的取值如下:

min_after_dequeue + (num_threads + a small safety margin) * batch_size

min_after_dequeue這個參數的意思是隊列中,做dequeue(取數據)的操作后,線程要保證隊列中至少剩下min_after_dequeue個數據。如果min_after_dequeue設置的過少,則即使shuffleTrue,也達不到好的混合效果。

假設你有一個隊列,現在里面有m個數據,你想要每次隨機從隊列中取n個數據,則代表先混合了m個數據,再從中取走n個。

當第一次取走n個后,隊列就變為m-n個數據;當你下次再想要取n個時,假設隊列在此期間入隊進來了k個數據,則現在的隊列中有(m-n+k)個數據,則此時會從混合的(m-n+k)個數據中隨機取走n個。

如果隊列填充的速度比較慢,k就比較小,那你取出來的n個數據只是與周圍很小的一部分(m-n+k)個數據進行了混合。

因為我們的目的肯定是想盡最大可能的混合數據,因此設置min_after_dequeue,可以保證每次dequeue后都有足夠量的數據填充盡隊列,保證下次dequeue時可以很充分的混合數據。

但是min_after_dequeue也不能設置的太大,這樣會導致隊列填充的時間變長,尤其是在最初的裝載階段,會花費比較長的時間。

其他參數和tf.train.batch()相同。

 

這里我們使用tf.train.shuffle_batch函數來對隊列中的樣本進行亂序處理。如下的模版:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue 越大意味着隨機效果越好但是也會占用更多的時間和內存
  # capacity 必須比 min_after_dequeue 大
  # 建議capacity的取值如下:
  # min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

 

class cifar10_data(object):
    def __init__(self, filename_queue):
        self.height = 32
        self.width = 32
        self.depth = 3
        self.label_bytes = 1
        self.image_bytes = self.height * self.width * self.depth
        self.record_bytes = self.label_bytes + self.image_bytes
        self.label, self.image = self.read_cifar10(filename_queue)
        
    def read_cifar10(self, filename_queue):
        reader = tf.FixedLengthRecordReader(record_bytes = self.record_bytes)
        key, value = reader.read(filename_queue)
        record_bytes = tf.decode_raw(value, tf.uint8)
        label = tf.cast(tf.slice(record_bytes, [0], [self.label_bytes]), tf.int32)
        image_raw = tf.slice(record_bytes, [self.label_bytes], [self.image_bytes])
        image_raw = tf.reshape(image_raw, [self.depth, self.height, self.width])
        image = tf.transpose(image_raw, (1,2,0))        
        image = tf.cast(image, tf.float32)
        return label, image
    
    
def inputs(data_dir, batch_size, train = True, name = 'input'):
    with tf.name_scope(name):
        if train:    
            filenames = [os.path.join(data_dir,'data_batch_%d.bin' % ii) 
                        for ii in range(1,6)]
            for f in filenames:
                if not tf.gfile.Exists(f):
                    raise ValueError('Failed to find file: ' + f)
                    
            filename_queue = tf.train.string_input_producer(filenames)
            read_input = cifar10_data(filename_queue)
            images = read_input.image
            images = tf.image.per_image_standardization(images)
            labels = read_input.label
            image, label = tf.train.shuffle_batch(
                                    [images,labels], batch_size = batch_size, 
                                    min_after_dequeue = 20000, capacity = 20192)
        
            return image, tf.reshape(label, [batch_size])
            
        else:
            filenames = [os.path.join(data_dir,'test_batch.bin')]
            for f in filenames:
                if not tf.gfile.Exists(f):
                    raise ValueError('Failed to find file: ' + f)
                    
            filename_queue = tf.train.string_input_producer(filenames)
            read_input = cifar10_data(filename_queue)
            images = read_input.image
            images = tf.image.per_image_standardization(images)
            labels = read_input.label
            image, label = tf.train.shuffle_batch(
                                    [images,labels], batch_size = batch_size, 
                                    min_after_dequeue = 20000, capacity = 20192)
        
            return image, tf.reshape(label, [batch_size])

 

這里介紹下函數tf.image.per_image_standardization(image),該函數對圖像進行線性變換使它具有零均值和單位方差,即規范化。其中參數image是一個3-D的張量,形狀為[height, width, channels]。一個具體的例子如下,該例采用了CIFAR-10數據集,采用了固定長度讀取的tf.FixedLengthRecordReader閱讀器和tf.decode_raw解析器,同時進行了數據預處理操作中的標准化操作,最后使用tf.train.shuffle_batch函數批量執行數據的亂序處理。

 

多個樣本和多個閱讀器

下面講分別展示三個不同Reader數目和不同樣本數的代碼示例:

文件准備 

$ echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3" > A.csv
$ echo -e "Bee1,B1\nBee2,B2\nBee3,B3" > B.csv
$ echo -e "Sea1,C1\nSea2,C2\nSea3,C3" > C.csv
$ cat A.csv
Alpha1,A1
Alpha2,A2
Alpha3,A3

 

單個Reader,單個樣本

import tensorflow as tf
# 生成一個先入先出隊列和一個QueueRunner
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
# 定義Reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# 定義Decoder
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
# 運行Graph
with tf.Session() as sess:
    coord = tf.train.Coordinator()  #創建一個協調器,管理線程
    threads = tf.train.start_queue_runners(coord=coord)  #啟動QueueRunner, 此時文件名隊列已經進隊。
    for i in range(10):
        print example.eval()   #取樣本的時候,一個Reader先從文件名隊列中取出文件名,讀出數據,Decoder解析后進入樣本隊列。
    coord.request_stop()
    coord.join(threads)
# outpt
Alpha1
Alpha2
Alpha3
Bee1
Bee2
Bee3
Sea1
Sea2
Sea3
Alpha1

 

單個Reader,多個樣本 

import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
# 使用tf.train.batch()會多加了一個樣本隊列和一個QueueRunner。Decoder解后數據會進入這個隊列,再批量出隊。
# 雖然這里只有一個Reader,但可以設置多線程,通過在tf.train.batch()中添加“num_threads=",相應增加線程數會提高讀取速度,但並不是線程越多越好。
example_batch, label_batch = tf.train.batch(
      [example, label], batch_size=5)
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(10):
        print example_batch.eval()
    coord.request_stop()
    coord.join(threads)
# output
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
# ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']
# ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']
# ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']
# ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']
# ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']
# ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']
# ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']
# ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']

 

多個Reader,多個樣本

import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
record_defaults = [['null'], ['null']]
example_list = [tf.decode_csv(value, record_defaults=record_defaults)
                  for _ in range(2)]  # Reader設置為2
# 使用tf.train.batch_join(),可以使用多個reader,並行讀取數據。每個Reader使用一個線程。
example_batch, label_batch = tf.train.batch_join(
      example_list, batch_size=5)
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(10):
        print example_batch.eval()
    coord.request_stop()
    coord.join(threads)
    
# output
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
# ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']
# ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']
# ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']
# ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']
# ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']
# ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']
# ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']
# ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']

 

注意

tf.train.batchtf.train.shuffle_batch函數是單個Reader讀取,但是可以多線程,通過設置num_threads參數來設置多線程。tf.train.batch_jointf.train.shuffle_batch_join可設置多Reader讀取,每個Reader使用一個線程。至於兩種方法的效率,單Reader時,2個線程就達到了速度的極限。多Reader時,2個Reader就達到了極限。所以並不是線程越多越快,甚至更多的線程反而會使效率下降。

上述兩種方法,前者相比於后者的好處是:

  • 避免了兩個不同的線程從同一個文件中讀取同一個樣本。
  • 避免了過多的磁盤搜索操作。

那么具體需要多少個讀取線程呢? 函數tf.train.shuffle_batch*graph提供了獲取文件名隊列中的元素個數之和的方法。 如果你有足夠多的讀取線程, 文件名隊列中的元素個數之和應該一直是一個略高於0的數。具體可以參考TensorBoard的教程。

 

創建線程並使用QueueRunner對象來獲取

我們要添加tf.train.QueueRunner對象到數據流圖中,在運行任何訓練步驟之前,需要調用tf.train.start_queue_runners函數,否則數據流圖將一直掛起,該函數將會啟動輸入管道的線程,填充樣本到隊列中,以便出隊操作可以從隊列中拿到樣本。這種情況下最好配合使用一個tf.train.Coordinator,這樣可以在發生錯誤的情況下正確地關閉這些線程。如果我們對訓練迭代數做了限制,那么需要使用一個訓練迭代數計數器,並且需要初始化它。推薦的代碼模板如下:

# Create the graph, etc.
init_op = tf.global_variables_initializer()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)
except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()

 

 

Animated File Queues

如上圖所示,每個QueueRunner負責一個階段,處理那些需要在線程中運行的入隊操作的列表。一旦數據流圖構造成功,tf.train.start_queue_runners函數就會要求數據流圖中每個QueueRunner去開始它的線程運行入隊操作。

如果一切順利的話,我們可以執行訓練步驟,同時隊列也會被后台線程來填充。如果設置了最大訓練迭代數,在某些時候,樣本出隊的操作可能會得到一個tf.OutOfRangeError的錯誤。這其實是TensorFlow的“文件結束”(EOF)——這就意味着已經達到了最大訓練迭代數,已經沒有更多可用的樣本了。

最后一個因素是Coordinator。這是負責在收到任何關閉信號的時候,讓所有的線程都知道。最常見的情況是在發生異常時,比如說其中一個線程在運行某些操作時出現錯誤(或一個普通的Python異常)。

 

疑問:在達到最大訓練迭代數的時候如何關閉線程?

想象一下,我們有一個模型並且設置了最大訓練迭代數。這意味着,生成文件的那個線程只會在產生OutOfRange錯誤之前運行。QueueRunner會捕獲該錯誤,並且關閉文件名的隊列,最后退出線程。關閉隊列做了兩件事情:

  • 如果試着對文件名隊列執行入隊操作將發生錯誤。
  • 當前或將來的出隊操作要么成功(如果隊列中還有足夠的元素)或立即失敗(發生OutOfRange錯誤)。它們不會等待更多的元素被添加到隊列中,因為上面的一點已經保證了這種情況不會發生。

關鍵是,當在文件名隊列被關閉時候,有可能還有許多文件名在該隊列中,這樣下一階段的流水線(包括reader和其它預處理)還可以繼續運行一段時間。 一旦文件名隊列空了之后,如果后面的流水線還要嘗試從文件名隊列中取出一個文件名,這將會觸發OutOfRange錯誤。在這種情況下,即使你可能有一個QueueRunner關聯着多個線程,如果該出錯線程不是QueueRunner中最后的那個線程,那么OutOfRange錯誤只會使得這一個線程退出。而其他那些正處理自己的最后一個文件的線程繼續運行,直至他們完成為止。(但如果你使用的是tf.train.Coordinator來管理所有的線程,那么其他類型的錯誤將導致所有線程停止)。一旦所有的reader線程觸發OutOfRange錯誤,樣本隊列才會被關閉。

同樣,樣本隊列中會有一些已經入隊的元素,所以樣本訓練將一直持續直到樣本隊列中再沒有樣本為止。如果樣本隊列是一個RandomShuffleQueue,因為你使用了shuffle_batch 或者 shuffle_batch_join,所以通常不會出現以往那種隊列中的元素會比min_after_dequeue 定義的更少的情況。 然而,一旦該隊列被關閉,min_after_dequeue設置的限定值將失效,最終隊列將為空。在這一點來說,當實際訓練線程嘗試從樣本隊列中取出數據時,將會觸發OutOfRange錯誤,然后訓練線程會退出。一旦所有的訓練線程完成,tf.train.Coordinator.join會返回,你就可以正常退出了。

 

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM