TensorFlow程序讀取數據一共有3種方法:
- 供給數據(Feeding): 在TensorFlow程序運行的每一步, 讓Python代碼來供給數據。
- 從文件讀取數據: 在TensorFlow圖的起始, 讓一個輸入管道從文件中讀取數據。
- 預加載數據: 在TensorFlow圖中定義常量或變量來保存所有數據(僅適用於數據量比較小的情況)。
一 預加載數據
import tensorflow as tf x1 = tf.constant([2,3,4]) x2 = tf.constant([4,0,1]) y = tf.add(x1,x2) with tf.Session() as sess: print(sess.run(y))
在這里使用x1,x2保存具體的值,即將數據直接內嵌到圖中,再將圖傳入會話中執行,當數據量較大時,圖的輸出會遇到效率問題。
二 供給數據
import tensorflow as tf x1 = tf.placeholder(tf.int32) x2 = tf.placeholder(tf.int32) #用python產生數據 v1 = [2,3,4] v2 = [4,0,1] y = tf.add(x1,x2) with tf.Session() as sess: print(sess.run(y,feed_dict={x1:v1,x2:v2}))
在這里x1,x2只是占位符,沒有具體的值,那么運行的時候去哪取值呢?這時候就要用到sess.run()的feed_dict參數,將python產生的數據傳入,並計算y。
以上兩種方法都很方便,但是遇到大型數據的時候就會很吃力,即使是Feed_dict,中間環節的增加也是不小的開銷,因為數據量大的時候,TensorFlow程序運行的每一步,我們都需要使用python代碼去從文件中讀取數據,並對讀取到的文件數據進行解碼。最優的方案就是在圖中定義好文件讀取的方法,讓TF自己從文件中讀取數據,並解碼成可用的樣本集。
三 TensorFlow中的隊列機制
從文件中讀取數據的方法有很多,比如可以在一個文本里面寫入圖片數據的路徑和標簽,然后用tensorflow的read_file()讀入圖片;也可以將圖片和標簽的值直接存放在CSV或者txt文件。
我們會在后面陸續介紹以下幾種讀取文件的方式:
- 從字典結構的數據文件讀取
- 從bin文件讀取
- 從CSV(TXT)讀取
- 從原圖讀取
- TFRecord格式文件的讀取
在講解文件的讀取之前,我們需要先了解一下TensorFlow中的隊列機制,后面也會詳細介紹。
TensorFlow提供了一個隊列機制,通過多線程將讀取數據與計算數據分開。因為在處理海量數據集的訓練時,無法把數據集一次全部載入到內存中,需要一邊從硬盤中讀取,一邊進行訓練,為了加快訓練速度,我們可以采用多個線程讀取數據,一個線程消耗數據。
下面簡要介紹一下,TensorFlow里與Queue有關的概念和用法。詳細內容點擊原文。
其實概念只有三個:
Queue
是TF隊列和緩存機制的實現QueueRunner
是TF中對操作Queue的線程的封裝Coordinator
是TF中用來協調線程運行的工具
雖然它們經常同時出現,但這三樣東西在TensorFlow里面是可以單獨使用的,不妨先分開來看待。
1.Queue
據實現的方式不同,分成具體的幾種類型,例如:
- tf.FIFOQueue :按入列順序出列的隊列
- tf.RandomShuffleQueue :隨機順序出列的隊列
- tf.PaddingFIFOQueue :以固定長度批量出列的隊列
- tf.PriorityQueue :帶優先級出列的隊列
- ... ...
這些類型的Queue除了自身的性質不太一樣外,創建、使用的方法基本是相同的。
創建函數的參數:
tf.FIFOQueue(capacity, dtypes, shapes=None, names=None,
shared_name=None, name="fifo_queue")
#創建的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操作 q = tf.FIFOQueue(3, "float") init = q.enqueue_many(([0.1, 0.2, 0.3],)) x = q.dequeue() y = x + 1 q_inc = q.enqueue([y]) #開啟一個session,session是會話,會話的潛在含義是狀態保持,各種tensor的狀態保持 with tf.Session() as sess: sess.run(init) for i in range(2): sess.run(q_inc) quelen = sess.run(q.size()) for i in range(quelen): print (sess.run(q.dequeue()))
2. QueueRunner
之前的例子中,入隊操作都在主線程中進行,Session中可以多個線程一起運行。 在數據輸入的應用場景中,入隊操作從硬盤上讀取,入隊操作是從硬盤中讀取輸入,放到內存當中,速度較慢。 使用QueueRunner
可以創建一系列新的線程進行入隊操作,讓主線程繼續使用數據。如果在訓練神經網絡的場景中,就是訓練網絡和讀取數據是異步的,主線程在訓練網絡,另一個線程在將數據從硬盤讀入內存。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 創建QueueRunner # 用多個線程向隊列添加數據 # 這里實際創建了4個線程,兩個增加計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue()))
能正確輸出結果,但是最后會報錯,ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是說,當循環結束后,該Session就會自動關閉,相當於main函數已經結束了。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 創建QueueRunner # 用多個線程向隊列添加數據 # 這里實際創建了4個線程,兩個增加計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) ''' #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue())) ''' # 主線程 sess = tf.Session() sess.run(tf.initialize_all_variables()) # 啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) # 主線程 for i in range(0, 10): print(sess.run(q.dequeue()))
不使用with tf.Session,那么Session就不會自動關閉。
並不是我們設想的1,2,3,4,本質原因是增加計數的進程會不停的后台運行,執行入隊的進程會先執行10次(因為隊列長度只有10),然后主線程開始消費數據,當一部分數據消費被后,入隊的進程又會開始執行。最終主線程消費完10個數據后停止,但其他線程繼續運行,程序不會結束。
經驗:因為tensorflow是在圖上進行計算,要驅動一張圖進行計算,必須要送入數據,如果說數據沒有送進去,那么sess.run(),就無法執行,tf也不會主動報錯,提示沒有數據送進去,其實tf也不能主動報錯,因為tf的訓練過程和讀取數據的過程其實是異步的。tf會一直掛起,等待數據准備好。現象就是tf的程序不報錯,但是一直不動,跟掛起類似。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 創建QueueRunner # 用多個線程向隊列添加數據 # 這里實際創建了4個線程,兩個增加計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue()))
上圖將生成數據的線程注釋掉,程序就會卡在sess.run(q.dequeue()),等待數據的到來QueueRunner是用來啟動入隊線程用的。
3.Coordinator
Coordinator是個用來保存線程組運行狀態的協調器對象,它和TensorFlow的Queue沒有必然關系,是可以單獨和Python線程使用的。例如:
''' Coordinator ''' import threading, time # 子線程函數 def loop(coord, id): t = 0 while not coord.should_stop(): print(id) time.sleep(1) t += 1 # 只有1號線程調用request_stop方法 if (t >= 2 and id == 0): coord.request_stop() # 主線程 coord = tf.train.Coordinator() # 使用Python API創建10個線程 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] # 啟動所有線程,並等待線程結束 for t in threads: t.start() coord.join(threads)
將這個程序運行起來,會發現所有的子線程執行完兩個周期后都會停止,主線程會等待所有子線程都停止后結束,從而使整個程序結束。由此可見,只要有任何一個線程調用了Coordinator的request_stop
方法,所有的線程都可以通過should_stop
方法感知並停止當前線程。
將QueueRunner和Coordinator一起使用,實際上就是封裝了這個判斷操作,從而使任何一個出現異常時,能夠正常結束整個程序,同時主線程也可以直接調用request_stop
方法來停止所有子線程的執行。
簡要 介紹完了TensorFlow中隊列機制后,我們再來看一下如何從文件中讀取數據。
四 從文件中讀取數據
1.從字典結構的數據文件讀取(python數據格式)
(1)在介紹字典結構的數據文件的讀取之前,我們先來介紹怎么創建字典結構的數據文件。
- 先要准備好圖片文件,我們使用Open CV3進行圖像讀取。
- 把cv2.imread()讀取到的圖像進行裁切,扭曲,等處理。
- 使用numpy才對數據進行處理,比如維度合並。
- 把處理好的每一張圖像的數據和標簽分別存放在對應的list(或者ndarray)中。
- 創建一個字典,包含兩個元素‘data’和'labels',並分別賦值為上面的list。
- 使用pickle模塊對字典進行序列化,並保存到文件中。
具體代碼我們查看如下文章:圖片存儲為cifar的Python數據格式
如果針對圖片比較多的情況,我們不太可能把所有圖像都寫入個文件,我們可以分批把圖像寫入幾個文件中。
(2)cifar10數據有三種版本,分別是MATLAB,Python和bin版本 數據下載鏈接: http://www.cs.toronto.edu/~kriz/cifar.html
其中Python版本的數據即是以字典結構存儲的數據 。
針對字典結構的數據文件讀取,我在AlexNet那節中有詳細介紹,主要就是通過pickle模塊對文件進行反序列化,獲取我們所需要的數據。
2.從bin文件讀取
在官網的cifar的例子中就是從bin文件中讀取的。bin文件需要以一定的size格式存儲,比如每個樣本的值占多少字節,label占多少字節,且這對於每個樣本都是固定的,然后一個挨着一個存儲。這樣就可以使用tf.FixedLengthRecordReader 類來每次讀取固定長度的字節,正好對應一個樣本存儲的字節(包括label)。並且用tf.decode_raw進行解析。
(1)制作bin file
如何將自己的圖片存為bin file,可以看看下面這篇博客,這篇博客使用C++和opencv將圖片存為二進制文件: http://blog.csdn.net/code_better/article/details/53289759
(2)從bin file讀入
在后面會詳細講解如何從二進制記錄文件中讀取數據,並以cifar10_input.py作為案例。
def read_cifar10(filename_queue): """Reads and parses examples from CIFAR10 data files. Recommendation: if you want N-way read parallelism, call this function N times. This will give you N independent Readers reading different files & positions within those files, which will give better mixing of examples. Args: filename_queue: A queue of strings with the filenames to read from. Returns: An object representing a single example, with the following fields: height: number of rows in the result (32) width: number of columns in the result (32) depth: number of color channels in the result (3) key: a scalar string Tensor describing the filename & record number for this example. label: an int32 Tensor with the label in the range 0..9. uint8image: a [height, width, depth] uint8 Tensor with the image data """ class CIFAR10Record(object): pass result = CIFAR10Record() # Dimensions of the images in the CIFAR-10 dataset. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the # input format. label_bytes = 1 # 2 for CIFAR-100 result.height = 32 result.width = 32 result.depth = 3 image_bytes = result.height * result.width * result.depth # Every record consists of a label followed by the image, with a # fixed number of bytes for each. record_bytes = label_bytes + image_bytes # Read a record, getting filenames from the filename_queue. No # header or footer in the CIFAR-10 format, so we leave header_bytes # and footer_bytes at their default of 0. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) result.key, value = reader.read(filename_queue) # Convert from a string to a vector of uint8 that is record_bytes long. record_bytes = tf.decode_raw(value, tf.uint8) # The first bytes represent the label, which we convert from uint8->int32. result.label = tf.cast( tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32) # The remaining bytes after the label represent the image, which we reshape # from [depth * height * width] to [depth, height, width]. depth_major = tf.reshape( tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), [result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(depth_major, [1, 2, 0]) return result
這段代碼如果看不懂,可以先跳過。
3.從CSV(TXT)文件讀取
有的時候在數據量不是很大的時候,可以從CSV或者TXT文件進行讀取。
(1)制作CSV(TXT)數據文本
CSV (TXT)一般是一行存一個樣本(包括樣本值和label),用逗號隔開。用python的普通文本寫入即可。
(2)讀取的時候tf.TextLineReader 類來每次讀取一行,並使用tf.decode_csv來對每一行進行解析。
這里主要介紹一下:
tf.decode_csv(records, record_defaults, field_delim=None, name=None)
- 首先records與第二種方法中相同,為reader讀到的內容,這里為CSV (TXT)的一行。
- 一般一行里面的值會用逗號或者空格隔開,這里第三個輸入參數就是指定用什么來進行分割,默認為逗號。
- 第二個輸入參數是指定分割后每個屬性的類型,比如分割后會有三列,那么第二個參數就應該是[[‘int32’], [], [‘string’]], 可不指定類型(設為空[])也可以。如果分割后的屬性比較多,比如有100個,可以用[ []*100 ]來表示
col= tf.decode_csv(records, record_defaults=[ [ ]*100 ], field_delim=' ', name=None)
返回的col是長度為100的list。
需要注意的是,當數據量比較大的時候,存成CSV或TXT文件要比BIN文件大的多,因此在TF中讀取的速度也會慢很多。因此盡量不要讀取大的CSV的方式來輸入。
在后面我們會詳細講解如何從CSV文件中讀取數據,並有具體的案例。
4 從原圖中讀取
(1)制作數據路徑文件
一行一例,每例包括該樣本的地址和label,用逗號分割開,用python普通文件寫入即可
(2)讀取
很多情況下我們的圖片訓練集就是原始圖片本身,並沒有像cifar dataset那樣存成bin等格式。因此我們需要根據一個train_list列表,去挨個讀取圖片。這里我用到的方法是首先將train_list.txt中的image list(也就是每一行有圖片的路徑和label組成)讀入隊列中,那么對每次dequeue的內容中可以提取當前圖片的路徑和label。
filename = os.path.join(data_dir, trainfilename) with open(filename) as fid: content = fid.read() content = content.split('\n') content = content[:-1] valuequeue = tf.train.string_input_producer(content,shuffle=True) value = valuequeue.dequeue() dir, labels = tf.decode_csv(records=value, record_defaults=[["string"], [""]], field_delim=" ") labels = tf.string_to_number(labels, tf.int32) imagecontent = tf.read_file(dir) image = tf.image.decode_png(imagecontent, channels=3, dtype=tf.uint8) image = tf.cast(image, tf.float32) #將圖片統一為32*32大小的 image = tf.image.resize_images(image,[32,32]) image = tf.reshape(image,[result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(image, [1, 2, 0])
不過這個方法對電腦輸入輸出要求比較高,如果機械硬盤有壞道,就會報Input/Output error,出現這種情況,要修復機械硬盤壞道。
5.從TFRecord文件讀取
TFrecord是Tensorflow推薦的數據集格式,與Tensorflow框架緊密結合。在TensorFlow中提供了一系列接口可以訪問TFRecord格式。后面會詳細介紹如何將原始圖片文件轉換為TFRecord格式,然后在運行中通過多線程的方式來讀取。
五 QueueRunner和Coordinator結合方式一
在TensorFlow中用Queue的經典模式有兩種,都是配合了QueueRunner和Coordinator一起使用的。
這里先介紹第一種方法,顯式的創建QueueRunner,然后調用它的create_threads
方法啟動線程。例如下面這段代碼:
''' 配合使用 ''' import numpy as np # 1000個4維輸入向量,每個數取值為1-10之間的隨機數 data = 10 * np.random.randn(1000, 4) + 1 # 1000個隨機的目標值,值為0或1 target = np.random.randint(0, 2, size=1000) # 創建Queue,隊列中每一項包含一個輸入數據和相應的目標值 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []]) # 批量入列數據(這是一個Operation) enqueue_op = queue.enqueue_many([data, target]) # 出列數據(這是一個Tensor定義) data_sample, label_sample = queue.dequeue() # 創建包含4個線程的QueueRunner qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) with tf.Session() as sess: # 創建Coordinator coord = tf.train.Coordinator() # 啟動QueueRunner管理的線程 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # 主線程,消費100個數據 for step in range(100): if coord.should_stop(): break data_batch, label_batch = sess.run([data_sample, label_sample]) # 主線程計算完成,停止所有采集數據的進程 coord.request_stop() coord.join(enqueue_threads)
六 QueueRunner和Coordinator結合方式二
這一小節我們會使用QueueRunner和Coordinator來實現bin文件,以及csv文件、TFRecord格式文件的讀取,不過這里我們采用隱式創建線程的方法。在講解具體代碼之前,我們需要先來講解關於TensorFlow中的文件隊列機制和內存隊列機制。
首先需要思考的一個問題是,什么是數據讀取?以圖像數據為例,讀取數據的過程可以用下圖來表示:
假設我們的硬盤中有一個圖片數據集0001.jpg,0002.jpg,0003.jpg……我們只需要把它們讀取到內存中,然后提供給GPU或是CPU進行計算就可以了。這聽起來很容易,但事實遠沒有那么簡單。事實上,我們必須要把數據先讀入后才能進行計算,假設讀入用時0.1s,計算用時0.9s,那么就意味着每過1s,GPU都會有0.1s無事可做,這就大大降低了運算的效率。
如何解決這個問題?方法就是將讀入數據和計算分別放在兩個線程中,將數據讀入內存的一個隊列,如下圖所示:
讀取線程源源不斷地將文件系統中的圖片讀入到一個內存的隊列中,而負責計算的是另一個線程,計算需要數據時,直接從內存隊列中取就可以了。這樣就可以解決GPU因為IO而空閑的問題!
而在tensorflow中,為了方便管理,在內存隊列前又添加了一層所謂的“文件名隊列”。
為什么要添加這一層文件名隊列?我們首先得了解機器學習中的一個概念:epoch。對於一個數據集來講,運行一個epoch就是將這個數據集中的圖片全部計算一遍。如一個數據集中有三張圖片A.jpg、B.jpg、C.jpg,那么跑一個epoch就是指對A、B、C三張圖片都計算了一遍。兩個epoch就是指先對A、B、C各計算一遍,然后再全部計算一遍,也就是說每張圖片都計算了兩遍。
tensorflow使用文件名隊列+內存隊列雙隊列的形式讀入文件,可以很好地管理epoch。下面我們用圖片的形式來說明這個機制的運行方式。如下圖,還是以數據集A.jpg, B.jpg, C.jpg為例,假定我們要跑一個epoch,那么我們就在文件名隊列中把A、B、C各放入一次,並在之后標注隊列結束。
程序運行后,內存隊列首先讀入A(此時A從文件名隊列中出隊):
再依次讀入B和C:
此時,如果再嘗試讀入,系統由於檢測到了“結束”,就會自動拋出一個異常(OutOfRange)。外部捕捉到這個異常后就可以結束程序了。這就是tensorflow中讀取數據的基本機制。如果我們要跑2個epoch而不是1個epoch,那只要在文件名隊列中將A、B、C依次放入兩次再標記結束就可以了。
典型的文件數據讀取會包含下面這些步驟:
1.文件名列表
可以使用字符串張量(比如["file0", "file1"]
, [("file%d" % i) for i in range(2)]
, [("file%d" % i) for i in range(2)]
) 或者tf.train.match_filenames_once
()函數來產生文件名列表。
filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i) for i in xrange(1, 6)]
2.文件名隊列
對於文件名隊列,我們使用tf.train.string_input_producer()函數。這個函數需要傳入一個文件名list,系統會自動將它轉為一個先入先出的文件名隊列, 文件閱讀器會需要它來讀取數據。
# 同時打開多個文件,顯示創建Queue,同時隱含了QueueRunner的創建 filename_queue = tf.train.string_input_producer(filenames)
3.可配置的 文件名亂序(shuffling),可配置的最大訓練迭代數(epoch limit)
tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,它就是我們上文中提到的epoch數。另外一個就是shuffle,shuffle是指在一個epoch內文件的順序是否被打亂。若設置shuffle=False,如下圖,每個epoch內,數據還是按照A、B、C的順序進入文件名隊列,這個順序不會改變:
如果設置shuffle=True,那么在一個epoch內,數據的前后順序就會被打亂,如下圖所示:
在tensorflow中,內存隊列不需要我們自己建立,我們只需要使用reader對象從文件名隊列中讀取數據就可以了。
4.針對輸入文件格式的閱讀器
根據你的文件格式, 選擇對應的文件閱讀器, 然后將文件名隊列提供給閱讀器的read()
方法。閱讀器的read()
方法會輸出一個key來表征輸入的文件和其中的紀錄(對於調試非常有用),同時得到一個字符串標量, 這個字符串標量可以被一個或多個解析器,或者轉換操作將其解碼為張量並且構造成為樣本。
CSV 文件讀取
從CSV文件中讀取數據, 需要使用TextLineReader()
和decode_csv()
操作, 如下面的例子所示:我們需要從iris_0.csv和iris_1.csv文件讀取數據,iris_0.csv和iris_1.csv文件數據是從iris數據集中選取的部分數據,內容如下:總共有21條記錄。
# 同時打開多個文件(文件格式必須一樣),隱式示創建Queue,同時隱含了QueueRunner的創建 filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"]) reader = tf.TextLineReader() # Tensorflow的Reader對象可以直接接受一個Queue作為輸入 每次read
的執行都會從文件中讀取一行內容 key, value = reader.read(filename_queue) # 如果某一列為空,指定默認值,同時指定了默認列的類型 record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]] #decode_csv
操作會解析讀取的一行內容並將其轉為張量列表 col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults) features = [col1, col2, col3, col4] #獲取一行數據 #row = tf.decode_csv(value, record_defaults=record_defaults) with tf.Session() as sess: coord = tf.train.Coordinator() # 啟動計算圖中所有的隊列線程 調用tf.train.start_queue_runners
來將文件名填充到隊列,否則read
操作會被阻塞到文件名隊列中有值為止。 threads = tf.train.start_queue_runners(coord=coord) # 主線程,消費50個數據 for _ in range(50): example, label = sess.run([features, col5]) print('Step {0} {1} {2}'.format(_,example,label)) # 主線程計算完成,停止所有采集數據的進程 coord.request_stop()
# 指定等待某個線程結束 coord.join(threads)
- 在這個例子中,
tf.train.string_input_produecer()
會將一個隱含的QueueRunner添加到全局圖中(類似的操作還有tf.train.shuffle_batch()
等)。由於沒有顯式地返回QueueRunner()調用create_threads()啟動線程,這里使用了tf.train.start_queue_runners()
方法直接啟動tf.GraphKeys.QUEUE_RUNNERS
集合中的所有隊列線程。初學者會經常在代碼中看到tf.train.start_queue_runners()這個函數,但往往很難理解它的用處,在這里,有了上面的鋪墊后,我們就可以解釋這個函數的作用了。
在我們使用tf.train.string_input_producer創建文件名隊列后,整個系統其實還是處於“停滯狀態”的,也就是說,我們文件名並沒有真正被加入到隊列中(如下圖所示)。此時如果我們開始計算,因為內存隊列中什么也沒有,計算單元就會一直等待,導致整個系統被阻塞。

而使用tf.train.start_queue_runners()之后,才會啟動填充隊列的線程,這時系統就不再“停滯”。此后計算單元就可以拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。

每次read()
的執行都會從文件中讀取一行內容, decode_csv()
操作會解析這一行內容並將其轉為張量列表。如果輸入的參數有缺失,record_default
參數可以根據張量的類型來設置默認值。
5.紀錄解析器(從bin文件讀入)
從二進制bin文件中讀取固定長度紀錄, 可以使用tf.FixedLengthRecordReader
的tf.decode_raw
操作。decode_raw
操作可以將一個字符串轉換為一個uint8的張量。
舉例來說,the CIFAR-10 dataset的文件格式定義是:每條記錄的長度都是固定的,一個字節的標簽,后面是3072字節的圖像數據。uint8的張量的標准操作就可以從中獲取圖像片並且根據需要進行重組。 例子代碼可以在tensorflow/models/image/cifar10/cifar10_input.py
找到。
def read_cifar10(filename_queue): """Reads and parses examples from CIFAR10 data files. Recommendation: if you want N-way read parallelism, call this function N times. This will give you N independent Readers reading different files & positions within those files, which will give better mixing of examples. Args: filename_queue: A queue of strings with the filenames to read from. Returns: An object representing a single example, with the following fields: height: number of rows in the result (32) width: number of columns in the result (32) depth: number of color channels in the result (3) key: a scalar string Tensor describing the filename & record number for this example. label: an int32 Tensor with the label in the range 0..9. uint8image: a [height, width, depth] uint8 Tensor with the image data """ class CIFAR10Record(object): pass result = CIFAR10Record() # Dimensions of the images in the CIFAR-10 dataset. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the # input format. label_bytes = 1 # 2 for CIFAR-100 result.height = 32 result.width = 32 result.depth = 3 image_bytes = result.height * result.width * result.depth # Every record consists of a label followed by the image, with a # fixed number of bytes for each. record_bytes = label_bytes + image_bytes # Read a record, getting filenames from the filename_queue. No # header or footer in the CIFAR-10 format, so we leave header_bytes # and footer_bytes at their default of 0. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) result.key, value = reader.read(filename_queue) # Convert from a string to a vector of uint8 that is record_bytes long. record_bytes = tf.decode_raw(value, tf.uint8) # The first bytes represent the label, which we convert from uint8->int32. result.label = tf.cast( tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32) # The remaining bytes after the label represent the image, which we reshape # from [depth * height * width] to [depth, height, width]. depth_major = tf.reshape( tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), [result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(depth_major, [1, 2, 0]) return result
read_cifar10()函數需要傳入一個文件名隊列,這個函數主要做了以下事情:
- 計算每個記錄(樣本)包含多少字節。一張圖像所占字節數 + 圖像標簽所占字節數。
- 每執行一次
read()
的執行都會從文件中讀取一行內容,decode_raw()
操作會解析這一行內容並將其轉為張量。 - 提取第一個字節,即標簽,並把類型從uint8->int32
- 提取剩下的字節,即圖像。
- 把圖像數據轉換為[height,width,depth]形狀。
- 返回一個對象resulit。包含兩個屬性(都是張量),result.uint8image包含一張形狀為[height,width,depth]的圖像,result.label存儲該圖像對應的標簽。
6.可配置的預處理器
你可以對輸入的樣本進行任意的預處理, 這些預處理不依賴於訓練參數,比如上面read_cifar10()的函數,獲取一張圖像數據張量后,我們可以對圖像進行處理,比如裁切,水平翻轉,以及對圖片進行歸一化處理等等。我們可以在tensorflow/models/image/cifar10/cifar10.py
找到數據歸一化, 提取隨機數據片,增加噪聲或失真等等預處理的例子。
7.批處理(TFRecord格式文件讀寫)
在數據輸入管道的末端, 我們需要有內存隊列來執行輸入樣本的批量讀取。我們使用tf.train.shuffle_batch()
函數來對內存隊列中的樣本進行亂序處理。
我們用一個具體的例子來演示一下tf.train.shuffle_batch()函數的使用。如圖,假設我們在當前文件夾中已經有A.、B.、C三個子文件夾。並在每個文件夾下下面放置對應的圖片。
針對這些文件我們需要做下面幾步處理:
- 讀取所有圖片文件,並存為TFRecord格式文件。
- 我們讀取記錄文中的數據。使用tf.TFRecordReader類創建一個文件讀取器,每執行一次read()方法會讀取一個樣本。
- 使用tf.train.shuffle_batch()函數每次讀取batch_size張圖像數據。
''' shuffle_batch()的使用 ''' import os import cv2 def write_binary(): ''' 將默認路徑下的所有圖片存為TFRecord格式 保存到文件data.tfrecord中 ''' #獲取當前工作目錄 cwd = os.getcwd() #當前目錄下的子目錄 一共有12張圖片 classes=['A','B','C'] #創建對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter('data.tfrecord') #遍歷每一個子文件夾 for index, name in enumerate(classes): class_path = os.path.join(cwd,name) #遍歷子目錄下的每一個文件 for img_name in os.listdir(class_path): #每一個圖片全路徑 img_path = os.path.join(class_path , img_name) #讀取圖像 img = cv2.imread(img_path) #縮放 img1 = cv2.resize(img,(250,250)) #將圖片轉化為原生bytes img_raw = img1.tobytes() #將數據整理成 TFRecord 需要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標簽 args: filename:TFRecord格式文件路徑 ''' #創建文件隊列,不限讀取的數量 filename_queue = tf.train.string_input_producer([filename],shuffle=False) #創建一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw': tf.FixedLenFeature([], tf.string) } ) # 解析從 serialized_example 讀取到的內容 img=tf.decode_raw(features['img_raw'],tf.uint8) img = tf.reshape(img, [250, 250, 3]) label = tf.cast(features['label'], tf.int32) return img,label #將默認路徑下的所有圖片存為TFRecord格式 保存到文件data.tfrecord中 write_binary() #讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標簽 img,label = read_and_decode('data.tfrecord') ''' 讀取批量數據 這里設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重復的, 這主要是因為設置內存隊列最小元素個數為100,最大元素個數為2000,而我們總共只有12張圖片,所以隊列中有許多重復的圖片 ''' img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #創建一個協調器,管理線程 coord = tf.train.Coordinator() #啟動QueueRunner, 此時文件名才開始進隊。 threads=tf.train.start_queue_runners(sess=sess,coord=coord) img, label = sess.run([img_batch, label_batch]) for i in range(12): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads)
運行后:
我們在讀取TFRecord文件時,一次讀取12張圖片,這主要是因為我們設置batch_size=12,並且我們可以看到讀取到的12張圖片是隨機,這里出現了重復的。
8.實驗
我們再用一個具體的例子感受tensorflow中的數據讀取。如圖,假設我們在當前文件夾中已經有A.jpg、B.jpg、C.jpg三張圖片,我們希望讀取這三張圖片5個epoch並且把讀取的結果重新存到read文件夾中。
''' 測試 ''' tf.reset_default_graph() # 新建一個Session with tf.Session() as sess: # 我們要讀三幅圖片A.jpg, B.jpg, C.jpg filename = ['A.jpg', 'B.jpg', 'C.jpg'] # string_input_producer會產生一個文件名隊列 filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5) # reader從文件名隊列中讀數據。對應的方法是reader.read reader = tf.WholeFileReader() key, value = reader.read(filename_queue) # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化 tf.local_variables_initializer().run() # 使用start_queue_runners之后,才會開始填充隊列 threads = tf.train.start_queue_runners(sess=sess) for i in range(15): # 獲取圖片數據並保存 image_data = sess.run(value) with open('read/test_%d.jpg' % (i+1), 'wb') as f: f.write(image_data)
我們這里使用filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)建立了一個會跑5個epoch的文件名隊列。並使用reader讀取,reader每次讀取一張圖片並保存。
運行代碼后,我們得到就可以看到read文件夾中的圖片,正好是按順序的5個epoch:
如果我們設置filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)中的shuffle=True,那么在每個epoch內圖像就會被打亂,如圖所示:
我們這里只是用三張圖片舉例,實際應用中一個數據集肯定不止3張圖片,不過涉及到的原理都是共通的。
完整代碼:

# -*- coding: utf-8 -*- """ Created on Wed May 2 20:39:25 2018 @author: zy """ import tensorflow as tf ''' TensorFlow中隊列的使用 ''' ''' 下面是一個單獨使用Queue的例子: ''' #創建的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操作 q = tf.FIFOQueue(3, "float") init = q.enqueue_many(([0.1, 0.2, 0.3],)) x = q.dequeue() y = x + 1 q_inc = q.enqueue([y]) #開啟一個session,session是會話,會話的潛在含義是狀態保持,各種tensor的狀態保持 with tf.Session() as sess: sess.run(init) for i in range(2): sess.run(q_inc) quelen = sess.run(q.size()) for i in range(quelen): print (sess.run(q.dequeue())) ''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 創建QueueRunner # 用多個線程向隊列添加數據 # 這里實際創建了4個線程,兩個增加計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) ''' #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue())) ''' # 主線程 sess = tf.Session() sess.run(tf.initialize_all_variables()) # 啟動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) # 主線程 for i in range(0, 10): print(sess.run(q.dequeue())) ''' Coordinator ''' import threading, time # 子線程函數 def loop(coord, id): t = 0 while not coord.should_stop(): print(id) time.sleep(1) t += 1 # 只有1號線程調用request_stop方法 if (t >= 2 and id == 0): coord.request_stop() # 主線程 coord = tf.train.Coordinator() # 使用Python API創建10個線程 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] # 啟動所有線程,並等待線程結束 for t in threads: t.start() coord.join(threads) ''' QueueRunner和Coordinator結合方式一 ''' ''' import numpy as np # 1000個4維輸入向量,每個數取值為1-10之間的隨機數 data = 10 * np.random.randn(1000, 4) + 1 # 1000個隨機的目標值,值為0或1 target = np.random.randint(0, 2, size=1000) # 創建Queue,隊列中每一項包含一個輸入數據和相應的目標值 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []]) # 批量入列數據(這是一個Operation) enqueue_op = queue.enqueue_many([data, target]) # 出列數據(這是一個Tensor定義) data_sample, label_sample = queue.dequeue() # 創建包含4個線程的QueueRunner qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) with tf.Session() as sess: # 創建Coordinator coord = tf.train.Coordinator() # 啟動QueueRunner管理的線程 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # 主線程,消費100個數據 for step in range(100): if coord.should_stop(): break data_batch, label_batch = sess.run([data_sample, label_sample]) # 主線程計算完成,停止所有采集數據的進程 coord.request_stop() coord.join(enqueue_threads) ''' ''' QueueRunner和Coordinator結合的數據讀取機制 讀取CSV文件 ''' tf.reset_default_graph() # 同時打開多個文件(文件格式必須一樣),隱式創建Queue,同時隱含了QueueRunner的創建 filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"]) reader = tf.TextLineReader() # Tensorflow的Reader對象可以直接接受一個Queue作為輸入 每次read的執行都會從文件中讀取一行內容 key, value = reader.read(filename_queue) # 如果某一列為空,指定默認值,同時指定了默認列的類型 record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]] # decode_csv 操作會解析讀取的一行內容並將其轉為張量列表 col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults) features = [col1, col2, col3, col4] #獲取一行數據 #row = tf.decode_csv(value, record_defaults=record_defaults) with tf.Session() as sess: coord = tf.train.Coordinator() # 啟動計算圖中所有的隊列線程 調用tf.train.start_queue_runners來將文件名填充到隊列,否則read操作會被阻塞到文件名隊列中有值為止。 threads = tf.train.start_queue_runners(coord=coord) # 主線程,消費50個數據 for _ in range(50): example, label = sess.run([features, col5]) print('Step {0} {1} {2}'.format(_,example,label)) # 主線程計算完成,停止所有采集數據的進程 coord.request_stop() #指定等待某個線程結束 coord.join(threads) ''' shuffle_batch()的使用 ''' import os import cv2 def write_binary(): ''' 將默認路徑下的所有圖片存為TFRecord格式 保存到文件data.tfrecord中 ''' #獲取當前工作目錄 cwd = os.getcwd() #當前目錄下的子目錄 一共有12張圖片 classes=['A','B','C'] #創建對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter('data.tfrecord') #遍歷每一個子文件夾 for index, name in enumerate(classes): class_path = os.path.join(cwd,name) #遍歷子目錄下的每一個文件 for img_name in os.listdir(class_path): #每一個圖片全路徑 img_path = os.path.join(class_path , img_name) #讀取圖像 img = cv2.imread(img_path) #縮放 img1 = cv2.resize(img,(250,250)) #將圖片轉化為原生bytes img_raw = img1.tobytes() #將數據整理成 TFRecord 需要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標簽 args: filename:TFRecord格式文件路徑 ''' #創建文件隊列,不限讀取的數量 filename_queue = tf.train.string_input_producer([filename],shuffle=False) #創建一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw': tf.FixedLenFeature([], tf.string) } ) # 解析從 serialized_example 讀取到的內容 img=tf.decode_raw(features['img_raw'],tf.uint8) img = tf.reshape(img, [250, 250, 3]) label = tf.cast(features['label'], tf.int32) return img,label tf.reset_default_graph() #將默認路徑下的所有圖片存為TFRecord格式 保存到文件data.tfrecord中 write_binary() #讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標簽 img,label = read_and_decode('data.tfrecord') ''' 讀取批量數據 這里設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重復的, 這主要是因為設置內存隊列最小元素個數為100,最大元素個數為2000,而我們總共只有12張圖片,所以隊列中有許多重復的圖片 ''' img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #創建一個協調器,管理線程 coord = tf.train.Coordinator() #啟動QueueRunner, 此時文件名才開始進隊。 threads=tf.train.start_queue_runners(sess=sess,coord=coord) img, label = sess.run([img_batch, label_batch]) for i in range(12): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads) ''' 測試 ''' tf.reset_default_graph() # 新建一個Session with tf.Session() as sess: # 我們要讀三幅圖片A.jpg, B.jpg, C.jpg filename = ['A.jpg', 'B.jpg', 'C.jpg'] # string_input_producer會產生一個文件名隊列 filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5) # reader從文件名隊列中讀數據。對應的方法是reader.read reader = tf.WholeFileReader() key, value = reader.read(filename_queue) # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化 tf.local_variables_initializer().run() # 使用start_queue_runners之后,才會開始填充隊列 threads = tf.train.start_queue_runners(sess=sess) for i in range(15): # 獲取圖片數據並保存 image_data = sess.run(value) with open('read/test_%d.jpg' % (i+1), 'wb') as f: f.write(image_data)
七 讀取原始圖片轉換為小批量大小的樣本數據
假如我們現在需要對貓和狗的圖片進行分類,我們已經收集了許多貓和狗的圖片,首先我們需要建立一個文件夾命名為data,在該文件夾下面創建兩個子文件夾train,test分別用於保存測試集和訓練集圖片,然后還需要在每個文件夾下面創建兩個文件夾,分別命名cat和dog,用來存放對應類別的圖片。
有了這些圖片之后,我們想每次讀取指定batch_size大小得數據樣本,並且這些樣本是打亂的。
- 我們把這些文件保存為TFRecord格式文件
- 從TFRecord文件中讀取batch_size樣本集。
代碼如下:
# -*- coding: utf-8 -*- """ Created on Thu May 10 16:48:17 2018 @author: zy """ ''' 如何將給定的數據原始圖片以及標簽保存成TFRecord格式的文件 並使用隊列每次讀取batch_size大小樣本集 ''' import tensorflow as tf import os import cv2 import random import numpy as np #隨機種子,使得每次運行結果一樣 random.seed(0) def get_files(dirpath): ''' 獲取文件相對路徑和標簽(非one_hot) 返回一個元組 args: dirpath:數據所在的目錄 記做父目錄 假設有10類數據,則父目錄下有10個子目錄,每個子目錄存放着對應的圖片 ''' #保存讀取到的的文件和標簽 image_list = [] label_list = [] #遍歷子目錄 classes = [x for x in os.listdir(dirpath) if os.path.isdir(dirpath)] #遍歷每一個子文件夾 for index, name in enumerate(classes): #子文件夾路徑 class_path = os.path.join(dirpath,name) #遍歷子目錄下的每一個文件 for img_name in os.listdir(class_path): #每一個圖片全路徑 img_path = os.path.join(class_path , img_name) #追加 image_list.append(img_path) label_list.append(index) #保存打亂后的文件和標簽 images = [] labels = [] # 打亂文件順序 連續打亂兩次 indices = list(range(len(image_list))) random.shuffle(indices) for i in indices: images.append(image_list[i]) labels.append(label_list[i]) random.shuffle([images,labels]) print('樣本長度為:',len(images)) #print(images[0:10],labels[0:10]) return images, labels ''' 生成數據的格式 先生成 TFRecord 格式的樣例數據,Example 的結構如下,表示第1個文件中的第1個數據 { 'i':0, 'j':0 } ''' def WriteTFRecord(dirpath,dstpath='.',train_data=True,IMAGE_HEIGHT=227,IMAGE_WIDTH=227): ''' 把指定目錄下的數據寫入同一個TFRecord格式文件中 args: dirpath:數據所在的目錄 記做父目錄 假設有10類數據,則父目錄下有10個子目錄,每個子目錄存放着對應的圖片 dstpath:保存TFRecord文件的目錄 train_data:表示傳入的是不是訓練集文件所在路徑 IMAGE_HEIGHT:保存的圖片數據高度 IMAGE_WIDTH:保存的圖片數據寬度 ''' if not os.path.isdir(dstpath): os.mkdir(dstpath) #獲取所有數據文件路徑,以及對應標簽 image_list, label_list = get_files(dirpath) #把海量數據寫入多個TFrecord文件 length_per_shard = 10000 #每個記錄文件的樣本長度 num_shards = int(np.ceil(len(image_list) / length_per_shard)) print('記錄文件個數:',num_shards) ''' 當所有數類別圖片都在一個文件夾下面時,可以將數據寫入不同的文件 但是如果同一類別的圖片放在相同的文件下,就不可以將數據寫入不同的文件 這主要是因為后者保存的TFRecord文件中都是同一類別,而隊列取數據時,是從一個文件讀取完,才會讀取另一個文件, 這樣會導致一次讀取的batch_size圖像都是同一類別,對訓練不利 因此我們必須想個辦法讓一個TFRecord格式的文件包含各種類別的圖片,並且順序是打亂的 ''' #依次寫入每一個TFRecord文件 for index in range(num_shards): #按0000n-of-0000m的后綴區分文件。n代表當前文件標號,沒代表文件總數 if train_data: filename = os.path.join(dstpath,'train_data.tfrecord-%.5d-of-%.5d'%(index,num_shards)) else: filename = os.path.join(dstpath,'test_data.tfrecord-%.5d-of-%.5d'%(index,num_shards)) print(filename) #創建對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter(filename) #起始索引 idx_start = index*length_per_shard #結束索引 idx_end = np.min([(index+1)*length_per_shard - 1,len(image_list)]) #遍歷子目錄下的每一個文件 for img_path,label in zip(image_list[idx_start:idx_end], label_list[idx_start:idx_end]): #讀取圖像 img = cv2.imread(img_path) ''' 在這里可以對圖片進行處理,也可以擴大數據集,或者歸一化輸入等待,不過我在這里不對原始圖片進行其它處理,只是把圖片大小設置為固定的 ''' #縮放 img = cv2.resize(img,(IMAGE_HEIGHT,IMAGE_WIDTH)) #將圖片轉化為原生bytes image = img.tobytes() #將數據整理成 TFRecord 需要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename,num_epochs = None,IMAGE_HEIGHT=227,IMAGE_WIDTH=227): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標簽 args: filename:TFRecord格式文件路徑 list列表 num_epochs:每個數據集文件迭代輪數 IMAGE_HEIGHT:保存的圖片數據高度 IMAGE_WIDTH:保存的圖片數據寬度 ''' ''' 創建文件隊列,通過設置 shuffle 參數為 True,將文件的入隊順序打亂,所以出隊順序是隨機的。隨機打亂文件順序和入隊操作 會跑在一個單獨的線程上,不會影響出隊的速度. 當輸入隊列中的所有文件都處理完后,它會將文件列表中的文件重新加入隊列。可以通過設置 num_epochs 參數來限制加載初始 文件列表的最大輪數 ''' filename_queue = tf.train.string_input_producer(filename,shuffle=False,num_epochs = num_epochs) #創建一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'image': tf.FixedLenFeature([], tf.string), 'label': tf.FixedLenFeature([], tf.int64) } ) # 解析從 serialized_example 讀取到的內容 img = tf.decode_raw(features['image'],tf.uint8) img = tf.reshape(img, [IMAGE_HEIGHT, IMAGE_WIDTH, 3]) ''' 在這里可以對讀取到的圖片數據進行預處理,比如歸一化輸入,PCA處理等,但是不可以增加數據 ''' label = tf.cast(features['label'], tf.int32) return img,label def input_data(filenames,num_epochs=None,batch_size=256, capacity=4096, min_after_dequeue=1024, num_threads=10): ''' 讀取小批量batch_size數據 args: filenames:TFRecord文件路徑組成的list num_epochs:每個數據集文件迭代輪數 batch_size:小批量數據大小 capacity:內存隊列元素最大個數 min_after_dequeue:內存隊列元素最小個數 num_threads:線城數 ''' ''' 讀取批量數據 這里設置batch_size,即一次從內存隊列中隨機讀取batch_size張圖片,這里設置內存隊列最小元素個數為1024,最大元素個數為4096 shuffle_batch 函數會將數據順序打亂 bacth 函數不會將數據順序打亂 ''' img,label = read_and_decode(filenames,num_epochs) images_batch, labels_batch = tf.train.shuffle_batch([img,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=batch_size*5, num_threads=num_threads) return images_batch,labels_batch def file_match(s,root='.'): ''' 尋找指定目錄下(不包含子目錄)中的文件名含有指定字符串的文件,並打印出其相對路徑 args: s:要匹配的字符串 root : 指定要搜索的目錄 return:返回符合條件的文件列表 ''' #用來保存目錄 dirs=[] #用來保存匹配字符串的文件 matchs=[] for current_name in os.listdir(root): add_root_name = os.path.join(root,current_name) if os.path.isdir(add_root_name): dirs.append(add_root_name) elif os.path.isfile(add_root_name) and s in add_root_name: matchs.append(add_root_name) ''' #這里用來遞歸搜索子目錄的 for dir in dirs: file_match(s,dir) ''' return matchs ''' 測試 ''' if __name__ == '__main__': #訓練集數據所在的目錄 dirpath = './data/train' training_step = 1 ''' 判斷訓練測試集TFRecord格式文件是否存在,不存在則生成 如果存在,直接讀取 ''' # 獲取當前目錄下包含指定字符串的文件列表 files = file_match('train_data.tfrecord') #判斷數據集是否存在 if len(files) == 0: print('開始讀圖片文件並寫入TFRecord格式文件中.........') #將指定路徑下所有圖片存為TFRecord格式 保存到文件data.tfrecord中 WriteTFRecord(dirpath) print('寫入完畢!\n') #正則表達式匹配 files = tf.train.match_filenames_once('./train_data.tfrecord') #讀取TFRecord格式格式文件,返回讀取到的batch_size圖像以及對應的標簽 images_batch, labels_batch = input_data(files) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #創建一個協調器,管理線程 coord = tf.train.Coordinator() #啟動QueueRunner, 此時文件名才開始進隊 threads = tf.train.start_queue_runners(sess=sess,coord=coord) print('開始訓練!\n') for step in range(training_step): img, label = sess.run([images_batch, labels_batch]) print('setp :',step) for i in range(256): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads)
程序運行后,會生成三個TFRecord文件,如下:
並且我們生成了了一個小批量樣本大小的樣本圖片:
參考文章
[1]Tensorflow讀取數據的4種方式(8)---《深度學習》