Python之TensorFlow的數據的讀取與存儲-2


  一、我們都知道Python由於GIL的原因導致多線程並不是真正意義上的多線程。但是TensorFlow在做多線程使用的時候是吧GIL鎖放開了的。所以TensorFlow是真正意義上的多線程。這里我們主要是介紹queue式的多線程運行方式。

  二、了解一下TensorFlow多線程queue的使用過程

  tensorflow:
        多線程是真正的多線程執行。
        隊列:
            tf.FIFOQueue(<capacity>, <dtypes>, <name>), 先進先出
            tf.RandomShuffleQueue, 隨機出隊列
        多線程:
            當數據量很大時,入隊操作從硬盤中讀取,放入內存。主線程需要等待操作完成,才能訓練。
            使用多線程,可以達到邊處理,邊訓練的異步效果。
        隊列管理器(棄用):
            tf.train.QueueRunner(<queue>, <enqueue_ops>)
            enqueue_ops: 添加線程的隊列操作列表[]*2為開啟2個線程,[]內為操作部分
            method:
                create_threads(<sess>, <coord>, <start>):
                    創建線程來運行給定的入隊操作。
                    start: 布爾值,是否啟動線程
                    coord: 線程協調器
                    return: 線程實例
        線程協調器:
            協調線程之間終止

  注意:這里使用的是TensorFlow1.0版本,在后續的版本中基本 把這種方式廢棄了。但是這里為了好的了解文件讀取的方式,我們使用queue式的多線程來執行。

import tensorflow as tf

def queue_demo():

    # 1、聲明隊列
    queue = tf.FIFOQueue(3, dtypes=tf.float32)

    # 2、加入數據
    init_queue = queue.enqueue_many([[0.1, 0.2, 0.3]])

    # 3、取出數據
    data = queue.dequeue()

    # 4、處理數據
    en_queue = queue.enqueue(data + 1)

    with tf.Session() as sess:
        # 初始化操作
        sess.run(init_queue)
        # 循環
        for i in range(10):
            sess.run(en_queue)
        for i in range(queue.size().eval()):
            print(queue.dequeue().eval())
import tensorflow as tf

def queue_thread_demo():
    # 1、聲明隊列
    queue = tf.FIFOQueue(100, dtypes=tf.float32)

    # 2、加入數據
    for i in range(100):
        queue.enqueue((i + 1)/100)

    # 3、操作
    data = queue.dequeue()
    en_queue = queue.enqueue(data + 1)

    # 3、定義隊列管理器
    qr = tf.train.QueueRunner(queue, enqueue_ops=[en_queue] * 2)

    with tf.Session() as sess:
        # 開啟線程協調器
        coord = tf.train.Coordinator()
        # 開啟線程
        threads = qr.create_threads(sess, coord=coord, start=True)
        for i in range(100):
            print(sess.run(queue.dequeue()))
        # 注:沒有線程協調器,主線程結束,會結束session,導致異常。
        coord.request_stop()
        coord.join(threads)  

  三、了解基本的數據讀取過程和api

  文件io:
        1、csv文件讀取一行 2、二進制文件指定bytes 3、圖片文件一張一張
        流程:
            1、構造一個文件隊列
            2、讀取文件內容
            3、解碼文件內容
            4、批處理
        api:
            1、文件隊列構造
                tf.train.string_input_producer(<string_tensor>, <shuffle=True>)
                string_tensor: 含有文件名的一階張量
                num_epochs: 過幾遍數據,默認無數遍
            2、文件閱讀器
                tf.TextLineReader、csv文件格式類型
                tf.FixedLengthRecordReader(record_bytes)、讀取固定值的二進制文件
                tf.TFRecordReader、讀取TfRecords
                共同:
                    read(file_queue): 隊列中指定數量
                    return: Tensors 元組(key:文件名, value默認行內容)
            3、文件解碼器:
                tf.decode_csv(<records>, <record_defaults=None>, <field_delim=None>, <name=None>)
                將CSV轉換為張量,與tf.TextLineReader搭配使用
                records: tensor型字符串,每一個字符串為CSV中的記錄
                record_defaults: 參數決定了所有張量的類型,並設置一個值在輸入字符串中缺少使用默認值
                tf.decode_raw(<bytes>, <out_type>, <little_endian=None>, <name=None>)
                將字節轉換為一個向量表示,字節為一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取為utf-8格式 

  在讀取文件之間先了解批處理的作用,主要是講每次讀出來的數據,緩存,然后到達一個批次,統一訓練

    管道讀端批處理:
            tf.train.batch(<tensors>, <batch_size>, <num_threads=1>, <capacity=32>, <name=None>)
            tensors: 張量列表
            tf.train.shuffle_batch(<tensors>, <batch_size>, <capacity>, <min_dequeue>)
            min_dequeue: 留下隊列里的張量個數,能夠保持隨機打亂

  四、csv文件讀取

    csv文件讀取:
            1、找到文件,構建列表
            2、構造文件隊列
            3、構造閱讀器,讀取隊列內容
            4、解碼內容
            5、批處理
import os
import tensorflow as tf

def csv_io():
    # 1、找到文件,加入隊列
    file_names = os.listdir("data/csv")
    file_list = [os.path.join("data/csv", file_name) for file_name in file_names]
    file_queue = tf.train.string_input_producer(file_list)
    # 2、讀取一行數據
    reader = tf.TextLineReader()
    key, value = reader.read(file_queue)
    # 3、解碼csv
    records = [[-1], [-1]]
    num1, num2 = tf.decode_csv(value, record_defaults=records)
    # 4、批處理
    num1_batch, num2_batch = tf.train.batch([num1, num2], batch_size=9, num_threads=1, capacity=9)

    with tf.Session() as sess:
        # 加入線程協調器
        coord = tf.train.Coordinator()
        # 線程運行
        threads = tf.train.start_queue_runners(sess, coord=coord)
        print(sess.run([num1_batch, num2_batch]))

        # 子線程回收
        coord.request_stop()
        coord.join(threads)

  五、圖片文件讀取

圖片讀取:
            每一個樣本必須保證特征數量一樣
            特征值:像素值
                單通道:灰度值(黑白圖片,像素中只有一個值) 三通道:RGB(每個像素都有3個值)
            三要素:長度寬度、通道值
            圖像的基本操作:
                目的:
                    1、增加圖片數據的統一性
                    2、所有圖片裝換成指定大小
                    3、縮小圖片數據量,防止增加開銷
                操作:
                    縮小圖片大小
                api:
                    圖片縮放:
                        tf.image.resize_images(<images>, <size>)
                        <images>:4-D形狀[batch, height, width, channels]/3-D[height, width, channels]
                        <size>:1-D int32張量:new_height, new_width, 圖像的新尺寸
                        return:4-D/3-D格式圖片
            圖片讀取api:
                tf.WholeFileReader:
                    將文件的全部內容作為輸入的讀取器
                    return:讀取器實例
                    read(<file_queue>):輸出將一個文件名(key)和該文件的內容值
            圖像解碼器:
                tf.image.decode_jpeg(<contents>):
                    將JPEG編碼的圖像解碼為unit8張量
                    return:uint8張量,3-D形狀[height, width, channels]
                tf.image.decode_png():
                    將PNG編碼的圖像解碼為uint8/uint16的張量
                    return:張量類型,3-D[height, width, channels]
import os
import tensorflow as tf

def image_io():
    # 1、讀取文件放入隊列
    image_names = os.listdir("data/image")
    image_files = [os.path.join("data/image", image_name) for image_name in image_names]
    image_queue = tf.train.string_input_producer(image_files)

    # 2、讀取一張圖片數據
    reader = tf.WholeFileReader()
    # value:一整張圖片的數據
    key, value = reader.read(image_queue)

    # 3、解碼
    image = tf.image.decode_jpeg(value)
    print(image)

    # 4、處理圖片的大小
    new_image = tf.image.resize_images(image, [350, 350])
    print(new_image)
    # 注意一定要固定形狀,批處理的時候所有數據必須固定
    new_image.set_shape([350, 350, 3])
    print(new_image)

    # 5、批處理
    image_batch = tf.train.batch([new_image], batch_size=2, num_threads=1, capacity=2)

    # 6、運行
    with tf.Session() as sess:
        # 加入線程協調器
        coord = tf.train.Coordinator()
        # 線程運行
        threads = tf.train.start_queue_runners(sess, coord=coord)
        print(sess.run([image_batch]))

        # 子線程回收
        coord.request_stop()
        coord.join(threads)

  六、二進制文件讀取

  二進制文件讀取:
            api:
                tf.FixedLengthRecordReader(<record_bytes>)
                record_bytes:數據長度
            解碼器:
                tf.decode_raw(<bytes>, <out_type>, <little_endian=None>, <name=None>)
                bytes:數據
                out_type:輸出類型
import os
import tensorflow as tf

def cifar_io():
    # 1、讀取文件加入隊列
    cifar_names = os.listdir("data/cifar")
    cifar_files = [os.path.join("data/cifar", cifar_name) for cifar_name in cifar_names if cifar_name.endswith(".bin") and cifar_name != "test_batch.bin"]
    file_queue = tf.train.string_input_producer(cifar_files)

    # 2、讀取二進制文件
    reader = tf.FixedLengthRecordReader(record_bytes=(32 * 32 * 3 + 1))
    key, value = reader.read(file_queue)

    # 3、解碼數據(二進制數據)
    # 樣本數據集根據具體數據處理,這里的數據為第一個數據為目標值,后面的為圖片數據
    target_image = tf.decode_raw(value, tf.uint8)

    # 4、分割數據
    target = tf.slice(target_image, [0], [1])
    image = tf.slice(target_image, [1], [32 * 32 * 3])

    # 5、特征數據形狀改變
    new_image = tf.reshape(image, [32, 32, 3])
    print(new_image)

    # 6、批處理
    image_batch, target_batch = tf.train.batch([new_image, target], batch_size=10, capacity=10)
    print(image_batch, target_batch)

    # 7、運行
    with tf.Session() as sess:
        # 線程協調器
        coord = tf.train.Coordinator()
        # 線程運行
        threads = tf.train.start_queue_runners(sess, coord=coord)
        print(sess.run([image_batch, target_batch]))

        # 子線程回收
        coord.request_stop()
        coord.join(threads)

  七、上面說完了,常用文件讀取的方式,下面說一下TensorFlow文件的存儲與讀取的方式。TensorFlow一般采用*.threcords文件格式進行保存。它是一種內置文件格式,是一種二進制文件,它可以更好的利用內存,更方便的復制和移動。

  tf.TFRecordReader
            一種內置文件格式,是一種二進制文件,它可以更好的利用內存,更方便的復制和移動
            為了將二進制數據和標簽(訓練類別標簽),數據存儲在同一文件中
            分析、存取
            文件格式:*.threcords
            寫入文件內容:example協議塊
            TF存儲:
                TFRecord存儲器
                    tf.python_io.TFRecordWriter(<path>)
                    method:
                        write(record)
                        close
                Example協議塊:
                    tf.train.Example(<features=None>)
                    features:tf.train.Features(<feature=None>)實例
                        feature:字典數據,key為要保存的數據
                            tf.train.Feature(<**options>)
                                **options:
                                    tf.train.ByteList(<value=[Bytes]>)
                                    tf.train.IntList(<value=[Value]>)
                                    tf.train.FloatList(<value=[Value]>)
                        return:Features實例
                    return:Example協議塊
            TF讀取:
                tf.parse_example(<serialized>, <features=None>, <name=None>)
                    serialized:標量字符串Tensor,一個序列化的Example
                    features:dict字典數據,鍵為讀取的名字,值為FixedLenFeature
                    return:一個鍵值對組成的字典,鍵為讀取的名字
                    tf.FixedLenFeature(<shape>, <dtype>)
                        shape:形狀
                        dtype:數據類型(float32/int64/string)
import os
import tensorflow as tf

def tf_records_io():
    # 1、讀取文件加入隊列
    cifar_names = os.listdir("data/cifar")
    cifar_files = [os.path.join("data/cifar", cifar_name) for cifar_name in cifar_names if
                   cifar_name.endswith(".bin") and cifar_name != "test_batch.bin"]
    file_queue = tf.train.string_input_producer(cifar_files)

    # 2、讀取二進制文件
    reader = tf.FixedLengthRecordReader(record_bytes=(32 * 32 * 3 + 1))
    key, value = reader.read(file_queue)

    # 3、解碼數據(二進制數據)
    # 樣本數據集根據具體數據處理,這里的數據為第一個數據為目標值,后面的為圖片數據
    target_image = tf.decode_raw(value, tf.uint8)

    # 4、分割數據
    target = tf.slice(target_image, [0], [1])
    image = tf.slice(target_image, [1], [32 * 32 * 3])

    # 5、特征數據形狀改變
    new_image = tf.reshape(image, [32, 32, 3])
    print(new_image)

    # 6、批處理
    image_batch, target_batch = tf.train.batch([new_image, target], batch_size=10, capacity=10)
    print(image_batch, target_batch)

    # 7、tf文件寫入
    with tf.Session() as sess:
        if not os.path.exists("data/tf_records/cifar.tfrecords"):
            # 1)存進tfRecords文件
            print("開始存儲")
            with tf.python_io.TFRecordWriter(path="data/tf_records/cifar.tfrecords") as writer:
                # 2)循環次數為批次數
                for i in range(10):
                    # 獲取對應值
                    image_data = image_batch[i].eval().tostring()
                    target_data = int(target_batch[i].eval()[0])
                    # 3)產生實例
                    example = tf.train.Example(features=tf.train.Features(feature={
                        "image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_data])),
                        "target": tf.train.Feature(int64_list=tf.train.Int64List(value=[target_data]))
                    }))
                    # 4)寫入數據
                    writer.write(example.SerializeToString())
            print("結束存儲")

    # 8、tf文件讀取
    # 1)讀取tfRecords文件
    tf_queue = tf.train.string_input_producer(["data/tf_records/cifar.tfrecords"])

    # 2)讀取數據
    tf_reader = tf.TFRecordReader()
    key, value = tf_reader.read(tf_queue)

    # 3)解析example
    features = tf.parse_single_example(value, features={
        "image": tf.FixedLenFeature([], dtype=tf.string),
        "target": tf.FixedLenFeature([], dtype=tf.int64)
    })
    print(features["image"], features["target"])

    # 4)解碼數據
    image = tf.decode_raw(features["image"], tf.uint8)
    image_reshape = tf.reshape(image, [32, 32, 3])
    target = tf.cast(features["target"], tf.int32)
    print(image_reshape, target)
    # 5)批處理
    image_batch, target_batch = tf.train.batch([image_reshape, target], batch_size=10, capacity=10)

    # 9、運行
    with tf.Session() as sess:
        # 線程協調器
        coord = tf.train.Coordinator()
        # 線程運行
        threads = tf.train.start_queue_runners(sess, coord=coord)

        # tf文件讀取
        print(sess.run([image_batch, target_batch]))

        # 子線程回收
        coord.request_stop()
        coord.join(threads)

  八、總結,說起來文件讀取只是讀取各種數據樣本的開始,這里的幾種讀取方式基本上就是常用的幾種形式了。目的是認識常規數據讀取的方式。

    但是這里要說明:現在處理數據的方式一般采用tf.data的api來進行數據的處理和調整。所以需要把精力放在tf.data上面。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM