tensorflow文件讀取


1、知識點

"""
注意:在tensorflow當中,運行操作具有依賴性

1、CPU操作計算與IO計算區別:
        CPU操作:
            1、tensorflow是一個正真的多線程,並行的執行任務
            2、使用tfrecords對文件讀取進行改善
            
        IO操作:
            1、一次性讀取數據,消耗內存
            2、一次性進行訓練
        
2、隊列API:        
        1、tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 先進先出隊列,按順序出隊列
                capacity:整數。可能存儲在此隊列中的元素數量的上限
                dtypes:DType對象列表。長度dtypes必須等於每個隊列元素中的張量數,dtype的類型形狀,決定了后面進隊列元素形狀
                return:返回一個進隊列操作
                        dequeue(name=None) #從隊列獲取一個數據
                        enqueue(vals, name=None) #將數據存放在隊列
                        enqueue_many(vals, name=None):放入數據,其中vals列表或者元組
    
        2、tf.RandomShuffleQueue 隨機出隊列
        
3、隊列管理器:qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q*2])
            qr.create_threads(sess,start=True)    #開啟子線程    

4、線程協調器:tf.train.Coordinator() ,線程協調員,實現一個簡單的機制來協調一組線程的終止
            返回對象方法:
                request_stop() 
                should_stop() 檢查是否要求停止
                join(threads=None, stop_grace_period_secs=120)  等待線程終止
                return:線程協調員實例

5、CSV文件讀取步驟:
        1、先找到文件,構造一個列表 
            file_name = os.listdir("./csvData/")
            file_list = [os.path.join(file) for file in file_name]
        2、構造文件列隊
            
        3、構造閱讀器,讀取隊列內容(一行)
        4、解碼內容
        5、批處理(多個樣本)

6、文件讀取API-文件隊列構造:tf.train.string_input_producer(string_tensor,,shuffle=True) 將輸出字符串(例如文件名)輸入到管道隊列
         參數:   
            string_tensor    含有文件名的1階張量
            num_epochs:過幾遍數據,默認無限過數據
            return:具有輸出字符串的隊列
            
7、文件讀取API-文件閱讀器:根據文件格式,選擇對應的文件閱讀器
    a) class tf.TextLineReader() 閱讀文本文件逗號分隔值(CSV)格式,默認按行讀取
            return:讀取器實例
    b) tf.FixedLengthRecordReader(record_bytes)要讀取每個記錄是固定數量字節的二進制文件
            record_bytes:整型,指定每次讀取的字節數
            return:讀取器實例
    c) tf.TFRecordReader    讀取TfRecords文件
    共同的讀取方法:read(file_queue):從隊列中指定數量內容 ,返回一個Tensors元組(key文件名字,value默認的內容(行,字節))

8、文件讀取API-文件內容解碼器:由於從文件中讀取的是字符串,需要函數去解析這些字符串到張量
    a) tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)  將CSV轉換為張量,與tf.TextLineReader搭配使用
        records:tensor型字符串,每個字符串是csv中的記錄行
        field_delim:默認分割符”,”
        record_defaults:參數決定了所得張量的類型,並設置一個值在輸入字符串中缺少使用默認值,如
    b) tf.decode_raw(bytes,out_type,little_endian = None,name = None) 
        將字節轉換為一個數字向量表示,字節為一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取為uint8格式

9、開啟線程操作
    tf.train.start_queue_runners(sess=None,coord=None) 收集所有圖中的隊列線程,並啟動線程
            sess:所在的會話中
            coord:線程協調器
            return:返回所有線程隊列

9、管道讀端批處理:
    a) tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 讀取指定大小(個數)的張量
            tensors:可以是包含張量的列表
            batch_size:從隊列中讀取的批處理大小
            num_threads:進入隊列的線程數
            capacity:整數,隊列中元素的最大數量
            return:tensors
    b) tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,num_threads=1,) 亂序讀取指定大小(個數)的張量
            min_after_dequeue:留下隊列里的張量個數,能夠保持隨機打亂

10、錯誤
    OutOfRangeError (see above for traceback): FIFOQueue '_1_batch/fifo_queue' is closed and has insufficient elements (requested 9, current size 0)
    解決方法:由於從上可知需要9個數據,但是讀取為0,因此可能是數據有問題,即數據文件或者讀取路徑有問題
"""

2、代碼

# coding = utf-8

import tensorflow as tf
import  os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

def readCSVFile(filelist):
    """
    讀取CSV文件
    :param filelist:  文件路徑+名字列表
    :return: 讀取的內容
    """
    #1、構造文件隊列
    file_queue = tf.train.string_input_producer(file_list)
    #2、構造CSV閱讀器讀取隊列數據(讀一行)
    reader = tf.TextLineReader()
    key , value = reader.read(file_queue)

    #3、對每行內容解碼
    #record_defaults:指定每一個樣本的每一列的類型,指定默認值[["None"], [4.0]]
    records =[["None"],["None"]]
    example , label = tf.decode_csv(value,record_defaults=records)
    #批處理大小跟隊列、數據的數量沒有影響,只決定這批次取多少數據batch_size

    ##############批處理####################
   #讀取多個數據,就需要使用批處理
    example_batch,label_batch = tf.train.batch([example,label],batch_size=20,num_threads=1,capacity=90)
    print(example_batch, label_batch)
    return example_batch,label_batch
    #return example,label


############隊列################
def queue():
    #1、首先定義數據
    Q = tf.FIFOQueue(3,tf.float32)

    #2、放入數據
    enq_many = Q.enqueue_many([[0.1,0.2,0.3],])

    #定義一些數據處理的邏輯
    out_q = Q.dequeue()
    out_q = out_q + 1
    en_q = Q.enqueue(out_q)

    #運行會話
    with tf.Session() as sess:
        #初始化隊列
        sess.run(enq_many)
        #處理數據
        for i in range(100):
            sess.run(en_q)
        for i in range(Q.size().eval()):
            print(sess.run(Q.dequeue()))
    return None


#############異步執行#########################
def unasynQueue():
    """
    異步讀取
    :return:
    """
    #1、定義一個隊列,1000
    Q = tf.FIFOQueue(1000,tf.float32)
    #2、定義要做的事,並放入隊列中
    var = tf.Variable(0.0)
    #實現自增
    data = tf.assign_add(var,tf.constant(1.0))
    en_q = Q.enqueue(data)
    #3、定義隊列管理器,指定多少個子線程,子線程做事
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q,]*2)

    #初始化變量OP
    init_op = tf.global_variables_initializer()
    with tf.Session() as sess:
        sess.run(init_op)

        # 開啟線程管理器
        coord = tf.train.Coordinator()
        #開啟子線程
        threads = qr.create_threads(sess,coord,start=True)
        #主線程不斷讀取數據
        for i in range(300):
            print(sess.run(Q.dequeue()))

        #回收線程
        coord.request_stop()
        coord.join(threads)
    return  None

if __name__ == '__main__':
    file_name = os.listdir("./csvData/")
    file_list = [os.path.join("./csvData/",file) for file in file_name]
    example_batch ,label_batch = readCSVFile(file_list)
    with tf.Session() as sess:
        # #定義一個線程協調器
        coord = tf.train.Coordinator()
        # #開啟讀文件的線程
        threads = tf.train.start_queue_runners(sess,coord=coord)
        #打印讀取的內容
        print(sess.run([example_batch,label_batch]))
        #回收線程
        coord.request_stop()
        coord.join(threads)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM