『TensorFlow』讀書筆記_TFRecord學習


一、程序介紹

1、包導入

# Author : Hellcat
# Time   : 17-12-29


import os
import numpy as np 
np.set_printoptions(threshold=np.inf)
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)
from tensorflow.examples.tutorials.mnist import input_data

2、TFRecord錄入格式轉換

TFRecord的錄入格式是確定的,整數或者二進制,在train函數中能查看所有可以接受類型

def _int64_feature(value):
    """生成整數數據屬性"""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    """生成字符型數據屬性"""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

3、TFRecord文件寫入測試

將mnist數據以每張圖片為單位寫入同一個TFR文件,

實際上就是每次把一個圖片相關信息都寫入,注意文件類型,二級制數據需要以string的格式保存

def TFRecord_write():
    """將mnist數據集寫入TFR文件"""
    mnist = input_data.read_data_sets('./Data_Set/Mnist_data',
                                      dtype=tf.uint8,one_hot=True)

    images = mnist.train.images
    labels = mnist.train.labels
    pixels = images.shape[1]                     # 784
    num_examples = mnist.train.num_examples      # 55000

    # TFRecords文件地址
    filename = './TFRecord_Output/mnist_train.tfrecords'

    if not os.path.exists('./TFRecord_Output/'):
        os.makedirs('./TFRecord_Output/')

    # 創建一個writer書寫文件
    writer = tf.python_io.TFRecordWriter(filename)
    for index in range(num_examples):
        # 提取單張圖像矩陣並轉換為字符串
        image_raw = images[index].tostring()
        # 將單張圖片相關數據寫入TFR文件
        example = tf.train.Example(features=tf.train.Features(feature={
            'pixels':  _int64_feature(pixels),
            'label':   _int64_feature(np.argmax(labels[index])),
            'img_raw': _bytes_feature(image_raw)
        }))
        writer.write(example.SerializeToString())  # 序列化為字符串
    writer.close()

調用,

if __name__=='__main__':
    TFRecord_write()

輸出如下,

4、TFRecord文件讀取測試

實際的讀取基本單位和存入的基本單位是一一對應的,當然也可以復數讀取,但是由於tf后續有batch拼接的函數,所以意義不大

def TFRecord_read():
    """從TFR文件讀取mnist數據集合"""
    # 創建一個reader讀取文件
    reader = tf.TFRecordReader()
    # 創建讀取文件隊列維護文件列表
    filename_queue = tf.train.string_input_producer(['./TFRecord_Output/mnist_train.tfrecords'])

    # 讀取數據
    # 每次讀取一個
    # _, serialized_example = reader.read(filename_queue)
    # 每次讀取多個
    _, serialized_example = reader.read_up_to(filename_queue,10)

    # 解析樣例
    # 解析函數選擇必須和上面讀取函數選擇相一致
    # 解析單個樣例
    # features = tf.parse_single_example(
    # 同時解析所有樣例
    features = tf.parse_example(
        serialized_example,
        features={
            'img_raw': tf.FixedLenFeature([],tf.string),
            'pixels':    tf.FixedLenFeature([],tf.int64),
            'label':    tf.FixedLenFeature([],tf.int64),
        })
    # 解析二進制數據格式,將之按照uint8格式解析
    images = tf.decode_raw(features['img_raw'],tf.uint8)
    labels = tf.cast(features['label'],tf.int32)
    pixels = tf.cast(features['pixels'],tf.int32)

    batch_size = 2
    capacity = 1000 + 3 * batch_size

    images.set_shape([10,784])
    labels.set_shape(10)
    pixels.set_shape(10)
    image_batch, label_batch, pixel_batch = tf.train.batch(
        [images, labels, pixels], batch_size=batch_size, capacity=capacity)
    # 線程控制器
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess,coord=coord) # 這里指代的是讀取數據的線程,如果不加的話隊列一直掛起
    for i in range(10):
        # print(images, labels, pixels)
        # print(sess.run(images))
        image, label, pixel = sess.run([image_batch,label_batch,pixel_batch])
        # image, label, pixel = sess.run([images,labels,pixels])
        print(image.shape,label,pixel)

輸出,

拼接batch尺寸為2,每次讀取10個數據

可以看到,這里batch尺寸指定的實際上是讀取次數

(2, 10, 784)

[[7 3 4 6 1 8 1 0 9 8]
 [0 3 1 2 7 0 2 9 6 0]]

[[784 784 784 784 784 784 784 784 784 784]
 [784 784 784 784 784 784 784 784 784 784]]
……

注意讀取數目和解析數目選擇的函數是要對應的,

# 讀取數據
# 每次讀取一個
# _, serialized_example = reader.read(filename_queue)
# 每次讀取多個,這里指定10個
_, serialized_example = reader.read_up_to(filename_queue,10)

# 解析樣例
# 解析函數選擇必須和上面讀取函數選擇相一致
# 解析單個樣例
# features = tf.parse_single_example()
# 同時解析所有樣例
features = tf.parse_example()

 值得注意的是這句,

threads = tf.train.start_queue_runners(sess=sess,coord=coord)

雖然后續未必會調用(coord實際上還是會調用用於協調停止),但實際上控制着隊列的數據讀取部分的啟動,注釋掉后會導致隊列有出無進進而掛起。

5、TFRecord文件批量生成

def TFR_gen():
    """TFR樣例數據生成"""
    # 定義寫多少個文件(數據量大時可以寫入多個文件加速)
    num_shards = 2
    # 定義每個文件中放入多少數據
    instances_per_shard = 2
    for i in range(num_shards):
        file_name = './TFRecord_Output/data.tfrecords-{}-of-{}'.format(i,num_shards)
        writer = tf.python_io.TFRecordWriter(file_name)
        for j in range(instances_per_shard):
            example = tf.train.Example(features=tf.train.Features(feature={
                'i':_int64_feature(i),
                'j':_int64_feature(j),
                'list':_bytes_feature(bytes([1,2,3]))
            }))
            writer.write(example.SerializeToString())  # 序列化為字符串
        writer.close()

 輸出如下,

6、TFRecord文件讀取測試

def TFR_load():
    """批量載入TFR數據"""
    # 匹配文件名
    files = tf.train.match_filenames_once('./TFRecord_Output/data.tfrecords-*')
    import glob
    # files = glob.glob('./TFRecord_Output/data.tfrecords-*')
    # 載入文件名
    filename_queue = tf.train.string_input_producer(files,shuffle=True)

    reader = tf.TFRecordReader()
    _,serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(
        serialized_example,
        features={
            'i':tf.FixedLenFeature([],tf.int64),
            'j':tf.FixedLenFeature([],tf.int64),
            'list':tf.FixedLenFeature([],tf.string)
        })
    '''
    # tf.train.match_filenames_once操作中產生了變量
    # 值得注意的是局部變量,需要用下面的初始化函數初始化
    sess.run(tf.local_variables_initializer())
    print(sess.run(files))
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess,coord=coord)
    for i in range(6):
        print(sess.run([features['i'],features['j']]))
    coord.request_stop()
    coord.join(threads)
    '''

    example, label, array = features['i'], features['j'], features['list']
    # 每個batch的中樣例的個數
    batch_size = 3
    # 隊列中樣例的個數
    capacity = 1000 + 3 * batch_size

    suffer = False
    # batch操作實際代指的就是數據讀取和預處理操作
    if suffer is not True:
        example_batch, label_batch, array_batch = tf.train.batch(
            [example, label, array], batch_size=batch_size, capacity=capacity)
    else:
        # 不同線程處理各自的文件
        # 隨機包含各個線程選擇文件名的隨機和文件內部數據讀取的隨機
        example_batch, label_batch, array_batch = tf.train.shuffle_batch(
            [example, label, array], batch_size=batch_size, capacity=capacity,
            min_after_dequeue=30)


    sess.run(tf.local_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)  # 這里指代的是讀取數據的線程,如果不加的話隊列一直掛起
    for i in range(2):
        cur_example_batch, cur_label_batch, cur_array_batch = sess.run([example_batch, label_batch, array_batch])
        print(cur_example_batch, cur_label_batch, cur_array_batch)

    coord.request_stop()
    coord.join(threads)

注意下面介紹,

# tf.train.match_filenames_once操作中產生了變量
# 值得注意的是局部變量,需要用下面的初始化函數初始化
sess.run(tf.local_variables_initializer())

batch生成的兩個函數如下,

suffer = False
# batch操作實際代指的就是數據讀取和預處理操作
if suffer is not True:
   example_batch, label_batch, array_batch = tf.train.batch(
       [example, label, array], batch_size=batch_size, capacity=capacity)
else:
    # 不同線程處理各自的文件
    # 隨機包含各個線程選擇文件名的隨機和文件內部數據讀取的隨機
    example_batch, label_batch, array_batch = tf.train.shuffle_batch(
        [example, label, array], batch_size=batch_size, capacity=capacity,
        min_after_dequeue=30)
  • 單一文件多線程,那么選用tf.train.batch(需要打亂樣本,有對應的tf.train.shuffle_batch)
  • 多線程多文件的情況,一般選用tf.train.batch_join來獲取樣本(打亂樣本同樣也有對應的tf.train.shuffle_batch_join使用)

二、batch和batch_join的說明

1、文件准備

$ echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3" > A.csv  
$ echo -e "Bee1,B1\nBee2,B2\nBee3,B3" > B.csv  
$ echo -e "Sea1,C1\nSea2,C2\nSea3,C3" > C.csv  
$ cat A.csv  
Alpha1,A1  
Alpha2,A2  
Alpha3,A3  

2、單個Reader,單個樣本

import tensorflow as tf  
# 生成一個先入先出隊列和一個QueueRunner  
filenames = ['A.csv', 'B.csv', 'C.csv']  
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)  
# 定義Reader  
reader = tf.TextLineReader()  
key, value = reader.read(filename_queue)  
# 定義Decoder  
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])  
# 運行Graph  
with tf.Session() as sess:  
    coord = tf.train.Coordinator()  #創建一個協調器,管理線程  
    threads = tf.train.start_queue_runners(coord=coord)  #啟動QueueRunner, 此時文件名隊列已經進隊。  
    for i in range(10):  
        print example.eval()   #取樣本的時候,一個Reader先從文件名隊列中取出文件名,讀出數據,Decoder解析后進入樣本隊列。  
    coord.request_stop()  
    coord.join(threads)  
# outpt  
# Alpha1  
# Alpha2  
# Alpha3  
# Bee1  
# Bee2  
# Bee3  
# Sea1  
# Sea2  
# Sea3  
# Alpha1  

3、單個Reader,多個樣本

import tensorflow as tf  
filenames = ['A.csv', 'B.csv', 'C.csv'] 
## filenames = tf.train.match_filenames_once('.\data\*.csv') 
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)  
reader = tf.TextLineReader()  
key, value = reader.read(filename_queue)  
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])  
# 使用tf.train.batch()會多加了一個樣本隊列和一個QueueRunner。Decoder解后數據會進入這個隊列,再批量出隊。  
# 雖然這里只有一個Reader,但可以設置多線程,相應增加線程數會提高讀取速度,但並不是線程越多越好。  
example_batch, label_batch = tf.train.batch(  
      [example, label], batch_size=5)  
with tf.Session() as sess:  
    coord = tf.train.Coordinator()  
    threads = tf.train.start_queue_runners(coord=coord)  
    for i in range(10):  
        print example_batch.eval()  
    coord.request_stop()  
    coord.join(threads)  
# output  
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']  
# ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']  
# ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']  
# ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']  
# ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']  
# ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']  
# ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']  
# ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']  
# ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']  
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] 

4、多Reader,多個樣本

import tensorflow as tf  
filenames = ['A.csv', 'B.csv', 'C.csv']  
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)  
reader = tf.TextLineReader()  
key, value = reader.read(filename_queue)  
record_defaults = [['null'], ['null']]  
example_list = [tf.decode_csv(value, record_defaults=record_defaults)  
                  for _ in range(2)]  # Reader設置為2  
# 使用tf.train.batch_join(),可以使用多個reader,並行讀取數據。每個Reader使用一個線程。  
example_batch, label_batch = tf.train.batch_join(  
      example_list, batch_size=5)  
with tf.Session() as sess:  
    coord = tf.train.Coordinator()  
    threads = tf.train.start_queue_runners(coord=coord)  
    for i in range(10):  
        print example_batch.eval()  
    coord.request_stop()  
    coord.join(threads)  

# output  
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']  
# ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']  
# ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']  
# ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']  
# ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']  
# ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']  
# ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']  
# ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']  
# ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']  
# ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']  

tf.train.batchtf.train.shuffle_batch'數是單個Reader讀取,但是可以多線程。tf.train.batch_join'tf.train.shuffle_batch_join可設置多Reader讀取,每個Reader使用一個線程。至於兩種方法的效率,單Reader時,2個線程就達到了速度的極限。多Reader時,2個Reader就達到了極限。所以並不是線程越多越快,甚至更多的線程反而會使效率下降。

 

在這個例子中, 雖然只使用了一個文件名隊列, 但是TensorFlow依然能保證多個文件閱讀器從同一次迭代(epoch)的不同文件中讀取數據,知道這次迭代的所有文件都被開始讀取為止。(通常來說一個線程來對文件名隊列進行填充的效率是足夠的)

另一種替代方案是: 使用tf.train.shuffle_batch 函數,設置num_threads的值大於1。 這種方案可以保證同一時刻只在一個文件中進行讀取操作(但是讀取速度依然優於單線程),而不是之前的同時讀取多個文件。這種方案的優點是:

  • 避免了兩個不同的線程從同一個文件中讀取同一個樣本。
  • 避免了過多的磁盤搜索操作。

簡單來說,

單一文件多線程,那么選用tf.train.batch(需要打亂樣本,有對應的tf.train.shuffle_batch)

多線程多文件的情況,一般選用tf.train.batch_join來獲取樣本(打亂樣本同樣也有tf.train.shuffle_batch_join)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM