1、知識點
""" 注意:在tensorflow當中,運行操作具有依賴性 1、CPU操作計算與IO計算區別: CPU操作: 1、tensorflow是一個正真的多線程,並行的執行任務 2、使用tfrecords對文件讀取進行改善 IO操作: 1、一次性讀取數據,消耗內存 2、一次性進行訓練 2、隊列API: 1、tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 先進先出隊列,按順序出隊列 capacity:整數。可能存儲在此隊列中的元素數量的上限 dtypes:DType對象列表。長度dtypes必須等於每個隊列元素中的張量數,dtype的類型形狀,決定了后面進隊列元素形狀 return:返回一個進隊列操作 dequeue(name=None) #從隊列獲取一個數據 enqueue(vals, name=None) #將數據存放在隊列 enqueue_many(vals, name=None):放入數據,其中vals列表或者元組 2、tf.RandomShuffleQueue 隨機出隊列 3、隊列管理器:qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q*2]) qr.create_threads(sess,start=True) #開啟子線程 4、線程協調器:tf.train.Coordinator() ,線程協調員,實現一個簡單的機制來協調一組線程的終止 返回對象方法: request_stop() should_stop() 檢查是否要求停止 join(threads=None, stop_grace_period_secs=120) 等待線程終止 return:線程協調員實例 5、CSV文件讀取步驟: 1、先找到文件,構造一個列表 file_name = os.listdir("./csvData/") file_list = [os.path.join(file) for file in file_name] 2、構造文件列隊 3、構造閱讀器,讀取隊列內容(一行) 4、解碼內容 5、批處理(多個樣本) 6、文件讀取API-文件隊列構造:tf.train.string_input_producer(string_tensor,,shuffle=True) 將輸出字符串(例如文件名)輸入到管道隊列 參數: string_tensor 含有文件名的1階張量 num_epochs:過幾遍數據,默認無限過數據 return:具有輸出字符串的隊列 7、文件讀取API-文件閱讀器:根據文件格式,選擇對應的文件閱讀器 a) class tf.TextLineReader() 閱讀文本文件逗號分隔值(CSV)格式,默認按行讀取 return:讀取器實例 b) tf.FixedLengthRecordReader(record_bytes)要讀取每個記錄是固定數量字節的二進制文件 record_bytes:整型,指定每次讀取的字節數 return:讀取器實例 c) tf.TFRecordReader 讀取TfRecords文件 共同的讀取方法:read(file_queue):從隊列中指定數量內容 ,返回一個Tensors元組(key文件名字,value默認的內容(行,字節)) 8、文件讀取API-文件內容解碼器:由於從文件中讀取的是字符串,需要函數去解析這些字符串到張量 a) tf.decode_csv(records,record_defaults=None,field_delim = None,name = None) 將CSV轉換為張量,與tf.TextLineReader搭配使用 records:tensor型字符串,每個字符串是csv中的記錄行 field_delim:默認分割符”,” record_defaults:參數決定了所得張量的類型,並設置一個值在輸入字符串中缺少使用默認值,如 b) tf.decode_raw(bytes,out_type,little_endian = None,name = None) 將字節轉換為一個數字向量表示,字節為一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取為uint8格式 9、開啟線程操作 tf.train.start_queue_runners(sess=None,coord=None) 收集所有圖中的隊列線程,並啟動線程 sess:所在的會話中 coord:線程協調器 return:返回所有線程隊列 9、管道讀端批處理: a) tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 讀取指定大小(個數)的張量 tensors:可以是包含張量的列表 batch_size:從隊列中讀取的批處理大小 num_threads:進入隊列的線程數 capacity:整數,隊列中元素的最大數量 return:tensors b) tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,num_threads=1,) 亂序讀取指定大小(個數)的張量 min_after_dequeue:留下隊列里的張量個數,能夠保持隨機打亂 10、錯誤 OutOfRangeError (see above for traceback): FIFOQueue '_1_batch/fifo_queue' is closed and has insufficient elements (requested 9, current size 0) 解決方法:由於從上可知需要9個數據,但是讀取為0,因此可能是數據有問題,即數據文件或者讀取路徑有問題 """
2、代碼
# coding = utf-8 import tensorflow as tf import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' def readCSVFile(filelist): """ 讀取CSV文件 :param filelist: 文件路徑+名字列表 :return: 讀取的內容 """ #1、構造文件隊列 file_queue = tf.train.string_input_producer(file_list) #2、構造CSV閱讀器讀取隊列數據(讀一行) reader = tf.TextLineReader() key , value = reader.read(file_queue) #3、對每行內容解碼 #record_defaults:指定每一個樣本的每一列的類型,指定默認值[["None"], [4.0]] records =[["None"],["None"]] example , label = tf.decode_csv(value,record_defaults=records) #批處理大小跟隊列、數據的數量沒有影響,只決定這批次取多少數據batch_size ##############批處理#################### #讀取多個數據,就需要使用批處理 example_batch,label_batch = tf.train.batch([example,label],batch_size=20,num_threads=1,capacity=90) print(example_batch, label_batch) return example_batch,label_batch #return example,label ############隊列################ def queue(): #1、首先定義數據 Q = tf.FIFOQueue(3,tf.float32) #2、放入數據 enq_many = Q.enqueue_many([[0.1,0.2,0.3],]) #定義一些數據處理的邏輯 out_q = Q.dequeue() out_q = out_q + 1 en_q = Q.enqueue(out_q) #運行會話 with tf.Session() as sess: #初始化隊列 sess.run(enq_many) #處理數據 for i in range(100): sess.run(en_q) for i in range(Q.size().eval()): print(sess.run(Q.dequeue())) return None #############異步執行######################### def unasynQueue(): """ 異步讀取 :return: """ #1、定義一個隊列,1000 Q = tf.FIFOQueue(1000,tf.float32) #2、定義要做的事,並放入隊列中 var = tf.Variable(0.0) #實現自增 data = tf.assign_add(var,tf.constant(1.0)) en_q = Q.enqueue(data) #3、定義隊列管理器,指定多少個子線程,子線程做事 qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q,]*2) #初始化變量OP init_op = tf.global_variables_initializer() with tf.Session() as sess: sess.run(init_op) # 開啟線程管理器 coord = tf.train.Coordinator() #開啟子線程 threads = qr.create_threads(sess,coord,start=True) #主線程不斷讀取數據 for i in range(300): print(sess.run(Q.dequeue())) #回收線程 coord.request_stop() coord.join(threads) return None if __name__ == '__main__': file_name = os.listdir("./csvData/") file_list = [os.path.join("./csvData/",file) for file in file_name] example_batch ,label_batch = readCSVFile(file_list) with tf.Session() as sess: # #定義一個線程協調器 coord = tf.train.Coordinator() # #開啟讀文件的線程 threads = tf.train.start_queue_runners(sess,coord=coord) #打印讀取的內容 print(sess.run([example_batch,label_batch])) #回收線程 coord.request_stop() coord.join(threads)