本文整理了TensorFlow中的數據讀取方法,在TensorFlow中主要有三種方法讀取數據:
1. 供給數據(Feeding):在TensorFlow程序運行的每一步, 讓Python代碼來供給數據。
2. 預加載數據(Preloaded data):在TensorFlow圖中定義常量或變量來保存所有數據(僅適用於數據量比較小的情況)。
3. 從文件讀取數據(Reading from files):在TensorFlow圖的起始, 讓一個輸入管線從文件中讀取數據。
對於數據量較小而言,可能一般選擇直接將數據加載進內存,然后再分batch
輸入網絡進行訓練(tip:使用這種方法時,結合yield
使用更為簡潔,大家自己嘗試一下吧,我就不贅述了)。但是,如果數據量較大,這樣的方法就不適用了,因為太耗內存,所以這時最好使用tensorflow提供的隊列queue
,也就是第3種方法 從文件讀取數據。
供給數據(Feeding)
我們一般用tf.placeholder
節點來feed
數據,該節點不需要初始化也不包含任何數據,我們在執行run()
或者eval()
指令時通過feed_dict
參數把數據傳入graph
中來計算。如果在運行過程中沒有對tf.placeholder
節點傳入數據,程序會報錯。例如:
import tensorflow as tf # 設計Graph x1 = tf.placeholder(tf.int16) x2 = tf.placeholder(tf.int16) y = tf.add(x1, x2) # 用Python產生數據 li1 = [2, 3, 4] li2 = [4, 0, 1] # 打開一個session --> 喂數據 --> 計算y with tf.Session() as sess: print sess.run(y, feed_dict={x1: li1, x2: li2})
預加載數據(Preloaded data)
預加載數據方法僅限於用在可以完全加載到內存中的小數據集上,主要有兩種方法:
- 把數據存在常量(constant)中。
- 把數據存在變量(variable)中,我們初始化並且永不改變它的值。
用常量更簡單些,但會占用更多的內存,因為常量存儲在graph
數據結構內部。例如:
training_data = ... training_labels = ... with tf.Session(): input_data = tf.constant(training_data) input_labels = tf.constant(training_labels) ...
如果用變量的話,我們需要在graph
構建好之后初始化該變量。例如:
training_data = ... training_labels = ... with tf.Session() as sess: data_initializer = tf.placeholder(dtype=training_data.dtype, shape=training_data.shape) label_initializer = tf.placeholder(dtype=training_labels.dtype, shape=training_labels.shape) input_data = tf.Variable(data_initializer, trainable=False, collections=[]) input_labels = tf.Variable(label_initializer, trainable=False, collections=[]) ... sess.run(input_data.initializer, feed_dict={data_initializer: training_data}) sess.run(input_labels.initializer, feed_dict={label_initializer: training_labels})
設定trainable=False
可以防止該變量被數據流圖的 GraphKeys.TRAINABLE_VARIABLES
收集, 這樣我們就不會在訓練的時候嘗試更新它的值; 設定 collections=[]
可以防止GraphKeys.VARIABLES
把它收集后做為保存和恢復的中斷點。
無論哪種方式,我們可以用tf.train.slice_input_producer
函數每次產生一個切片。這樣就會讓樣本在整個迭代中被打亂,所以在使用批處理的時候不需要再次打亂樣本。所以我們不使用shuffle_batch
函數,取而代之的是純tf.train.batch
函數。 如果要使用多個線程進行預處理,需要將num_threads
參數設置為大於1的數字。
從文件讀取數據(Reading from files)
從文件中讀取數據一般包含以下步驟:
- 文件名列表
- 文件名隨機排序(可選的)
- 迭代控制(可選的)
- 文件名隊列
- 針對輸入文件格式的閱讀器
- 記錄解析器
- 預處理器(可選的)
- 樣本隊列
TFRecords文件
TFRecords其實是一種二進制文件,雖然它不如其他格式好理解,但是它能更好的利用內存,更方便復制和移動,並且不需要單獨的標簽文件。總而言之,這樣的文件格式好處多多。
TFRecords文件包含了tf.train.Example
協議內存塊(protocol buffer)(協議內存塊包含了字段 Features
)。我們可以寫一段代碼獲取你的數據, 將數據填入到Example
協議內存塊(protocol buffer),將協議內存塊序列化為一個字符串, 並且通過tf.python_io.TFRecordWriter
寫入到TFRecords文件。
從TFRecords文件中讀取數據, 可以使用tf.TFRecordReader
的tf.parse_single_example
解析器。這個操作可以將Example
協議內存塊(protocol buffer)解析為張量。
生成TFRecords文件
我們使用tf.train.Example
來定義我們要填入的數據格式,然后使用tf.python_io.TFRecordWriter
來寫入。
import os
import tensorflow as tf from PIL import Image cwd = os.getcwd() ''' 此處我加載的數據目錄如下: 0 -- img1.jpg img2.jpg img3.jpg ... 1 -- img1.jpg img2.jpg ... 2 -- ... 這里的0, 1, 2...就是類別,也就是下文中的classes classes是我根據自己數據類型定義的一個列表,大家可以根據自己的數據情況靈活運用 ... ''' writer = tf.python_io.TFRecordWriter("train.tfrecords") for index, name in enumerate(classes): class_path = cwd + name + "/" for img_name in os.listdir(class_path): img_path = class_path + img_name img = Image.open(img_path) img = img.resize((224, 224)) img_raw = img.tobytes() #將圖片轉化為原生bytes example = tf.train.Example(features=tf.train.Features(feature={ "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index])), 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])) })) writer.write(example.SerializeToString()) #序列化為字符串 writer.close()
關於Example
Feature
的相關定義和詳細內容,我推薦去官網查看相關API。
基本的,一個Example
中包含Features
,Features
里包含Feature
(這里沒s)的字典。最后,Feature
里包含有一個 FloatList
, 或者ByteList
,或者Int64List
就這樣,我們把相關的信息都存到了一個文件中,所以前面才說不用單獨的label文件。而且讀取也很方便。
接下來是一個簡單的讀取小例子:
for serialized_example in tf.python_io.tf_record_iterator("train.tfrecords"):
example = tf.train.Example() example.ParseFromString(serialized_example) image = example.features.feature['image'].bytes_list.value label = example.features.feature['label'].int64_list.value # 可以做一些預處理之類的 print image, label
使用隊列讀取
一旦生成了TFRecords文件,為了高效地讀取數據,TF中使用隊列(queue
)讀取數據。
def read_and_decode(filename): #根據文件名生成一個隊列 filename_queue = tf.train.string_input_producer([filename]) reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) #返回文件名和文件 features = tf.parse_single_example(serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw' : tf.FixedLenFeature([], tf.string), }) img = tf.decode_raw(features['img_raw'], tf.uint8) img = tf.reshape(img, [224, 224, 3]) img = tf.cast(img, tf.float32) * (1. / 255) - 0.5 label = tf.cast(features['label'], tf.int32) return img, label
之后我們可以在訓練的時候這樣使用
img, label = read_and_decode("train.tfrecords") #使用shuffle_batch可以隨機打亂輸入 img_batch, label_batch = tf.train.shuffle_batch([img, label], batch_size=30, capacity=2000, min_after_dequeue=1000) init = tf.initialize_all_variables() with tf.Session() as sess: sess.run(init) threads = tf.train.start_queue_runners(sess=sess) for i in range(3): val, l= sess.run([img_batch, label_batch]) #我們也可以根據需要對val, l進行處理 #l = to_categorical(l, 12) print(val.shape, l)
幾個注意事項:
第一,tensorflow里的graph能夠記住狀態(state
),這使得TFRecordReader
能夠記住tfrecord
的位置,並且始終能返回下一個。而這就要求我們在使用之前,必須初始化整個graph,這里我們使用了函數tf.initialize_all_variables()
來進行初始化。
第二,tensorflow中的隊列和普通的隊列差不多,不過它里面的operation
和tensor
都是符號型的(symbolic
),在調用sess.run()
時才執行。
第三, TFRecordReader
會一直彈出隊列中文件的名字,直到隊列為空。
第四,在我們使用tf.train.string_input_producer創建文件名隊列后,整個系統其實還是處於“停滯狀態”的,也就是說,我們文件名並沒有真正被加入到隊列中。此時如果我們開始計算,因為內存隊列中什么也沒有,計算單元就會一直等待,導致整個系統被阻塞。而使用tf.train.start_queue_runners之后,才會啟動填充隊列的線程,這時系統就不再“停滯”。此后計算單元就可以拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。
文件名、隨機排序和迭代控制
我們首先要有個文件名列表,為了產生文件名列表,我們可以手動用Python輸入字符串,例如:
["file0", "file1"]
[("file%d" % i) for i in range(2)]
[("file%d" % i) for i in range(2)]
我們也可以用tf.train.match_filenames_once
函數來生成文件名列表。
有了文件名列表后,我們需要把它送入tf.train.string_input_producer
函數中生成一個先入先出的文件名隊列,文件閱讀器需要從該隊列中讀取文件名。
string_input_producer( string_tensor, num_epochs=None, shuffle=True, seed=None, capacity=32, shared_name=None, name=None, cancel_op=None )
這個QueueRunner
的工作線程獨立於文件閱讀器的線程, 因此隨機排序和將文件名送入到文件名隊列這些過程不會阻礙文件閱讀器的運行。一個QueueRunner
每次會把每批次的所有文件名送入隊列中,可以通過設置string_input_producer
函數的shuffle
參數來對文件名隨機排序,或者通過設置num_epochs
來決定對string_tensor
里的文件使用多少次,類型為整型,如果想要迭代控制則需要設置了num_epochs
參數,同時需要添加tf.local_variables_initializer()
進行初始化,如果不初始化會報錯。
文件格式
根據不同的文件格式, 應該選擇對應的文件閱讀器, 然后將文件名隊列提供給閱讀器的read
方法。閱讀器每次從隊列中讀取一個文件,它的read
方法會輸出一個key
來表征讀入的文件和其中的紀錄(對於調試非常有用),同時得到一個字符串標量, 這個字符串標量可以被一個或多個解析器,或者轉換操作將其解碼為張量並且構造成為樣本。
根據不同的文件類型,有三種不同的文件閱讀器:
tf.TextLineReader
tf.FixedLengthRecordReader
tf.TFRecordReader
它們分別用於單行讀取(如CSV文件)、固定長度讀取(如CIFAR-10的.bin二進制文件)、TensorFlow標准格式讀取。
根據不同的文件閱讀器,有三種不同的解析器,它們分別對應上面三種閱讀器:
tf.decode_csv
tf.decode_raw
tf.parse_single_example
和tf.parse_example
CSV文件
當我們讀入CSV格式的文件時,我們可以使用tf.TextLineReader
閱讀器和tf.decode_csv
解析器。例如:
filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) # 創建一個Filename Queue # 該例csv文件中共有5列數據,前四列為features,最后一列為label reader = tf.TextLineReader() # 文件閱讀器 key, value = reader.read(filename_queue) # 每次執行閱讀器都從文件讀一行內容 # Default values, in case of empty columns. Also specifies the type of the decoded result. record_defaults = [[1], [1], [1], [1], [1]] # 文件數據皆為整數 col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults) features = tf.stack([col1, col2, col3, col4]) with tf.Session() as sess: # Start populating the filename queue. coord = tf.train.Coordinator() #創建一個協調器,管理線程 threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。 for i in range(1200): # Retrieve a single instance: example, label = sess.run([features, col5]) coord.request_stop() coord.join(threads)
record_defaults = [[1], [1], [1], [1], [1]]
代表了解析的摸版,默認用,
隔開,是用於指定矩陣格式以及數據類型的,CSV文件中的矩陣是NXM的,則此處為1XM,例如上例中M=5。[1]表示解析為整型,如果矩陣中有小數,則應為float型,[1]應該變為[1.0],[‘null’]解析為string類型。每次read
的執行都會從文件中讀取一行內容, decode_csv
操作會解析這一行內容並將其轉為張量列表。在調用run
或者eval
去執行read
之前, 必須先調用tf.train.start_queue_runners
來將文件名填充到隊列。否則read
操作會被阻塞到文件名隊列中有值為止。
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults)
, 矩陣中有幾列,這里就要寫幾個參數,比如5列,就要寫到col5,不管你到底用多少。否則報錯。
固定長度記錄
我們也可以從二進制文件(.bin)中讀取固定長度的數據,使用的是tf.FixedLengthRecordReader
閱讀器和tf.decode_raw
解析器。decode_raw
節點會把string
轉化為uint8
類型的張量。
例如CIFAR-10數據集就采用的固定長度的數據,1字節的標簽,后面跟着3072字節的圖像數據。使用uint8
類型張量的標准操作可以把每個圖像的片段截取下來並且按照需要重組。下面有一個例子:
reader = tf.FixedLengthRecordReader(record_bytes = record_bytes) key, value = reader.read(filename_queue) record_bytes = tf.decode_raw(value, tf.uint8) label = tf.cast(tf.slice(record_bytes, [0], [label_bytes]), tf.int32) image_raw = tf.slice(record_bytes, [label_bytes], [image_bytes]) image_raw = tf.reshape(image_raw, [depth, height, width]) image = tf.transpose(image_raw, (1,2,0)) # 圖像形狀為[height, width, channels] image = tf.cast(image, tf.float32)
這里介紹上述代碼中出現的函數:tf.slice()
slice( input_, begin, size, name=None )
從一個張量input
中提取出長度為size
的一部分,提取的起點由begin
定義。size
是一個向量,它代表着在每個維度提取出的tensor
的大小。begin
表示提取的位置,它表示的是input
的起點偏離值,也就是從每個維度第幾個值開始提取。
begin
從0開始,size
從1開始,如果size[i]
的值為-1,則第i個維度從begin
處到余下的所有值都被提取出來。
例如:
# 'input' is [[[1, 1, 1], [2, 2, 2]], # [[3, 3, 3], [4, 4, 4]], # [[5, 5, 5], [6, 6, 6]]] tf.slice(input, [1, 0, 0], [1, 1, 3]) ==> [[[3, 3, 3]]] tf.slice(input, [1, 0, 0], [1, 2, 3]) ==> [[[3, 3, 3], [4, 4, 4]]] tf.slice(input, [1, 0, 0], [2, 1, 3]) ==> [[[3, 3, 3]], [[5, 5, 5]]]
標准TensorFlow格式
我們也可以把任意的數據轉換為TensorFlow所支持的格式, 這種方法使TensorFlow的數據集更容易與網絡應用架構相匹配。這種方法就是使用TFRecords文件,TFRecords文件包含了tf.train.Example
的protocol buffer(里面包含了名為 Features
的字段)。你可以寫一段代碼獲取你的數據, 將數據填入到Example
的protocol buffer,將protocol buffer序列化為一個字符串, 並且通過tf.python_io.TFRecordWriter
類寫入到TFRecords文件。
從TFRecords文件中讀取數據, 可以使用tf.TFRecordReader
閱讀器以及tf.parse_single_example
解析器。parse_single_example
操作可以將Example
protocol buffer解析為張量。 具體可以參考如下例子,把MNIST數據集轉化為TFRecords格式:
SparseTensors這種稀疏輸入數據類型使用隊列來處理不是太好。如果要使用SparseTensors你就必須在批處理之后使用tf.parse_example
去解析字符串記錄 (而不是在批處理之前使用tf.parse_single_example
) 。
預處理
我們可以對輸入的樣本數據進行任意的預處理, 這些預處理不依賴於訓練參數, 比如數據歸一化, 提取隨機數據片,增加噪聲或失真等等。具體可以參考如下對CIFAR-10處理的例子:
批處理
經過了之前的步驟,在數據讀取流程的最后, 我們需要有另一個隊列來批量執行輸入樣本的訓練,評估或者推斷。根據要不要打亂順序,我們常用的有兩個函數:
tf.train.batch()
tf.train.shuffle_batch()
下面來分別介紹:
tf.train.batch()
tf.train.batch( tensors, batch_size, num_threads=1, capacity=32, enqueue_many=False, shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, shared_name=None, name=None )
該函數將會使用一個隊列,函數讀取一定數量的tensors
送入隊列,然后每次從中選取batch_size
個tensors
組成一個新的tensors
返回出來。
capacity
參數決定了隊列的長度。
num_threads
決定了有多少個線程進行入隊操作,如果設置的超過一個線程,它們將從不同文件不同位置同時讀取,可以更加充分的混合訓練樣本。
如果enqueue_many
參數為False
,則輸入參數tensors
為一個形狀為[x, y, z]
的張量,輸出為一個形狀為[batch_size, x, y, z]
的張量。如果enqueue_many
參數為True
,則輸入參數tensors
為一個形狀為[*, x, y, z]
的張量,其中所有*
的數值相同,輸出為一個形狀為[batch_size, x, y, z]
的張量。
當allow_smaller_final_batch
為True
時,如果隊列中的張量數量不足batch_size
,將會返回小於batch_size
長度的張量,如果為False
,剩下的張量會被丟棄。
tf.train.shuffle_batch()
tf.train.shuffle_batch( tensors, batch_size, capacity, min_after_dequeue, num_threads=1, seed=None, enqueue_many=False, shapes=None, allow_smaller_final_batch=False, shared_name=None, name=None )
該函數類似於上面的tf.train.batch()
,同樣創建一個隊列,主要區別是會首先把隊列中的張量進行亂序處理,然后再選取其中的batch_size
個張量組成一個新的張量返回。但是新增加了幾個參數。
capacity
參數依然為隊列的長度,建議capacity
的取值如下:
min_after_dequeue + (num_threads + a small safety margin) * batch_size
min_after_dequeue
這個參數的意思是隊列中,做dequeue(取數據)的操作后,線程要保證隊列中至少剩下min_after_dequeue
個數據。如果min_after_dequeue
設置的過少,則即使shuffle
為True
,也達不到好的混合效果。
假設你有一個隊列,現在里面有m個數據,你想要每次隨機從隊列中取n個數據,則代表先混合了m個數據,再從中取走n個。
當第一次取走n個后,隊列就變為m-n個數據;當你下次再想要取n個時,假設隊列在此期間入隊進來了k個數據,則現在的隊列中有(m-n+k)個數據,則此時會從混合的(m-n+k)個數據中隨機取走n個。
如果隊列填充的速度比較慢,k就比較小,那你取出來的n個數據只是與周圍很小的一部分(m-n+k)個數據進行了混合。
因為我們的目的肯定是想盡最大可能的混合數據,因此設置
min_after_dequeue
,可以保證每次dequeue后都有足夠量的數據填充盡隊列,保證下次dequeue時可以很充分的混合數據。但是
min_after_dequeue
也不能設置的太大,這樣會導致隊列填充的時間變長,尤其是在最初的裝載階段,會花費比較長的時間。
其他參數和tf.train.batch()
相同。
這里我們使用tf.train.shuffle_batch
函數來對隊列中的樣本進行亂序處理。如下的模版:
def read_my_file_format(filename_queue): reader = tf.SomeReader() key, record_string = reader.read(filename_queue) example, label = tf.some_decoder(record_string) processed_example = some_processing(example) return processed_example, label def input_pipeline(filenames, batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer( filenames, num_epochs=num_epochs, shuffle=True) example, label = read_my_file_format(filename_queue) # min_after_dequeue 越大意味着隨機效果越好但是也會占用更多的時間和內存 # capacity 必須比 min_after_dequeue 大 # 建議capacity的取值如下: # min_after_dequeue + (num_threads + a small safety margin) * batch_size min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch
class cifar10_data(object): def __init__(self, filename_queue): self.height = 32 self.width = 32 self.depth = 3 self.label_bytes = 1 self.image_bytes = self.height * self.width * self.depth self.record_bytes = self.label_bytes + self.image_bytes self.label, self.image = self.read_cifar10(filename_queue) def read_cifar10(self, filename_queue): reader = tf.FixedLengthRecordReader(record_bytes = self.record_bytes) key, value = reader.read(filename_queue) record_bytes = tf.decode_raw(value, tf.uint8) label = tf.cast(tf.slice(record_bytes, [0], [self.label_bytes]), tf.int32) image_raw = tf.slice(record_bytes, [self.label_bytes], [self.image_bytes]) image_raw = tf.reshape(image_raw, [self.depth, self.height, self.width]) image = tf.transpose(image_raw, (1,2,0)) image = tf.cast(image, tf.float32) return label, image def inputs(data_dir, batch_size, train = True, name = 'input'): with tf.name_scope(name): if train: filenames = [os.path.join(data_dir,'data_batch_%d.bin' % ii) for ii in range(1,6)] for f in filenames: if not tf.gfile.Exists(f): raise ValueError('Failed to find file: ' + f) filename_queue = tf.train.string_input_producer(filenames) read_input = cifar10_data(filename_queue) images = read_input.image images = tf.image.per_image_standardization(images) labels = read_input.label image, label = tf.train.shuffle_batch( [images,labels], batch_size = batch_size, min_after_dequeue = 20000, capacity = 20192) return image, tf.reshape(label, [batch_size]) else: filenames = [os.path.join(data_dir,'test_batch.bin')] for f in filenames: if not tf.gfile.Exists(f): raise ValueError('Failed to find file: ' + f) filename_queue = tf.train.string_input_producer(filenames) read_input = cifar10_data(filename_queue) images = read_input.image images = tf.image.per_image_standardization(images) labels = read_input.label image, label = tf.train.shuffle_batch( [images,labels], batch_size = batch_size, min_after_dequeue = 20000, capacity = 20192) return image, tf.reshape(label, [batch_size])
這里介紹下函數tf.image.per_image_standardization(image)
,該函數對圖像進行線性變換使它具有零均值和單位方差,即規范化。其中參數image
是一個3-D的張量,形狀為[height, width, channels]
。一個具體的例子如下,該例采用了CIFAR-10數據集,采用了固定長度讀取的tf.FixedLengthRecordReader
閱讀器和tf.decode_raw
解析器,同時進行了數據預處理操作中的標准化操作,最后使用tf.train.shuffle_batch
函數批量執行數據的亂序處理。
多個樣本和多個閱讀器
下面講分別展示三個不同Reader數目和不同樣本數的代碼示例:
文件准備
$ echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3" > A.csv $ echo -e "Bee1,B1\nBee2,B2\nBee3,B3" > B.csv $ echo -e "Sea1,C1\nSea2,C2\nSea3,C3" > C.csv $ cat A.csv Alpha1,A1 Alpha2,A2 Alpha3,A3
單個Reader,單個樣本
import tensorflow as tf # 生成一個先入先出隊列和一個QueueRunner filenames = ['A.csv', 'B.csv', 'C.csv'] filename_queue = tf.train.string_input_producer(filenames, shuffle=False) # 定義Reader reader = tf.TextLineReader() key, value = reader.read(filename_queue) # 定義Decoder example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']]) # 運行Graph with tf.Session() as sess: coord = tf.train.Coordinator() #創建一個協調器,管理線程 threads = tf.train.start_queue_runners(coord=coord) #啟動QueueRunner, 此時文件名隊列已經進隊。 for i in range(10): print example.eval() #取樣本的時候,一個Reader先從文件名隊列中取出文件名,讀出數據,Decoder解析后進入樣本隊列。 coord.request_stop() coord.join(threads) # outpt Alpha1 Alpha2 Alpha3 Bee1 Bee2 Bee3 Sea1 Sea2 Sea3 Alpha1
單個Reader,多個樣本
import tensorflow as tf filenames = ['A.csv', 'B.csv', 'C.csv'] filename_queue = tf.train.string_input_producer(filenames, shuffle=False) reader = tf.TextLineReader() key, value = reader.read(filename_queue) example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']]) # 使用tf.train.batch()會多加了一個樣本隊列和一個QueueRunner。Decoder解后數據會進入這個隊列,再批量出隊。 # 雖然這里只有一個Reader,但可以設置多線程,通過在tf.train.batch()中添加“num_threads=",相應增加線程數會提高讀取速度,但並不是線程越多越好。 example_batch, label_batch = tf.train.batch( [example, label], batch_size=5) with tf.Session() as sess: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(10): print example_batch.eval() coord.request_stop() coord.join(threads) # output # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1'] # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2'] # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3'] # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2'] # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1'] # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3'] # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
多個Reader,多個樣本
import tensorflow as tf filenames = ['A.csv', 'B.csv', 'C.csv'] filename_queue = tf.train.string_input_producer(filenames, shuffle=False) reader = tf.TextLineReader() key, value = reader.read(filename_queue) record_defaults = [['null'], ['null']] example_list = [tf.decode_csv(value, record_defaults=record_defaults) for _ in range(2)] # Reader設置為2 # 使用tf.train.batch_join(),可以使用多個reader,並行讀取數據。每個Reader使用一個線程。 example_batch, label_batch = tf.train.batch_join( example_list, batch_size=5) with tf.Session() as sess: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(10): print example_batch.eval() coord.request_stop() coord.join(threads) # output # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1'] # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2'] # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3'] # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2'] # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1'] # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3'] # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
注意
tf.train.batch
與tf.train.shuffle_batch
函數是單個Reader讀取,但是可以多線程,通過設置num_threads
參數來設置多線程。tf.train.batch_join
與tf.train.shuffle_batch_join
可設置多Reader讀取,每個Reader使用一個線程。至於兩種方法的效率,單Reader時,2個線程就達到了速度的極限。多Reader時,2個Reader就達到了極限。所以並不是線程越多越快,甚至更多的線程反而會使效率下降。
上述兩種方法,前者相比於后者的好處是:
- 避免了兩個不同的線程從同一個文件中讀取同一個樣本。
- 避免了過多的磁盤搜索操作。
那么具體需要多少個讀取線程呢? 函數tf.train.shuffle_batch*
為graph
提供了獲取文件名隊列中的元素個數之和的方法。 如果你有足夠多的讀取線程, 文件名隊列中的元素個數之和應該一直是一個略高於0的數。具體可以參考TensorBoard的教程。
創建線程並使用QueueRunner對象來獲取
我們要添加tf.train.QueueRunner
對象到數據流圖中,在運行任何訓練步驟之前,需要調用tf.train.start_queue_runners
函數,否則數據流圖將一直掛起,該函數將會啟動輸入管道的線程,填充樣本到隊列中,以便出隊操作可以從隊列中拿到樣本。這種情況下最好配合使用一個tf.train.Coordinator
,這樣可以在發生錯誤的情況下正確地關閉這些線程。如果我們對訓練迭代數做了限制,那么需要使用一個訓練迭代數計數器,並且需要初始化它。推薦的代碼模板如下:
# Create the graph, etc. init_op = tf.global_variables_initializer() # Create a session for running operations in the Graph. sess = tf.Session() # Initialize the variables (like the epoch counter). sess.run(init_op) # Start input enqueue threads. coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) try: while not coord.should_stop(): # Run training steps or whatever sess.run(train_op) except tf.errors.OutOfRangeError: print('Done training -- epoch limit reached') finally: # When done, ask the threads to stop. coord.request_stop() # Wait for threads to finish. coord.join(threads) sess.close()

如上圖所示,每個QueueRunner
負責一個階段,處理那些需要在線程中運行的入隊操作的列表。一旦數據流圖構造成功,tf.train.start_queue_runners
函數就會要求數據流圖中每個QueueRunner
去開始它的線程運行入隊操作。
如果一切順利的話,我們可以執行訓練步驟,同時隊列也會被后台線程來填充。如果設置了最大訓練迭代數,在某些時候,樣本出隊的操作可能會得到一個tf.OutOfRangeError
的錯誤。這其實是TensorFlow的“文件結束”(EOF)——這就意味着已經達到了最大訓練迭代數,已經沒有更多可用的樣本了。
最后一個因素是Coordinator
。這是負責在收到任何關閉信號的時候,讓所有的線程都知道。最常見的情況是在發生異常時,比如說其中一個線程在運行某些操作時出現錯誤(或一個普通的Python異常)。
疑問:在達到最大訓練迭代數的時候如何關閉線程?
想象一下,我們有一個模型並且設置了最大訓練迭代數。這意味着,生成文件的那個線程只會在產生OutOfRange
錯誤之前運行。QueueRunner
會捕獲該錯誤,並且關閉文件名的隊列,最后退出線程。關閉隊列做了兩件事情:
- 如果試着對文件名隊列執行入隊操作將發生錯誤。
- 當前或將來的出隊操作要么成功(如果隊列中還有足夠的元素)或立即失敗(發生
OutOfRange
錯誤)。它們不會等待更多的元素被添加到隊列中,因為上面的一點已經保證了這種情況不會發生。
關鍵是,當在文件名隊列被關閉時候,有可能還有許多文件名在該隊列中,這樣下一階段的流水線(包括reader和其它預處理)還可以繼續運行一段時間。 一旦文件名隊列空了之后,如果后面的流水線還要嘗試從文件名隊列中取出一個文件名,這將會觸發OutOfRange
錯誤。在這種情況下,即使你可能有一個QueueRunner
關聯着多個線程,如果該出錯線程不是QueueRunner
中最后的那個線程,那么OutOfRange
錯誤只會使得這一個線程退出。而其他那些正處理自己的最后一個文件的線程繼續運行,直至他們完成為止。(但如果你使用的是tf.train.Coordinator
來管理所有的線程,那么其他類型的錯誤將導致所有線程停止)。一旦所有的reader線程觸發OutOfRange
錯誤,樣本隊列才會被關閉。
同樣,樣本隊列中會有一些已經入隊的元素,所以樣本訓練將一直持續直到樣本隊列中再沒有樣本為止。如果樣本隊列是一個RandomShuffleQueue
,因為你使用了shuffle_batch
或者 shuffle_batch_join
,所以通常不會出現以往那種隊列中的元素會比min_after_dequeue
定義的更少的情況。 然而,一旦該隊列被關閉,min_after_dequeue
設置的限定值將失效,最終隊列將為空。在這一點來說,當實際訓練線程嘗試從樣本隊列中取出數據時,將會觸發OutOfRange
錯誤,然后訓練線程會退出。一旦所有的訓練線程完成,tf.train.Coordinator.join
會返回,你就可以正常退出了。
參考
- Reading data | TensorFlow
- Custom Data Readers | TensorFlow
- TF Boys (TensorFlow Boys ) 養成記(二): TensorFlow 數據讀取 - Charles-Wan - 博客園
- tf.train.string_input_producer | TensorFlow
- tf.train.batch | TensorFlow
- tf.train.shuffle_batch | TensorFlow
- honggang.io
- 數據讀取 - TensorFlow 官方文檔中文版 - 極客學院Wiki
- TensorFlow官網教程Convolutional Neural Networks 難點詳解 - 瑪莎魚的博客 - CSDN博客