一、我們都知道Python由於GIL的原因導致多線程並不是真正意義上的多線程。但是TensorFlow在做多線程使用的時候是吧GIL鎖放開了的。所以TensorFlow是真正意義上的多線程。這里我們主要是介紹queue式的多線程運行方式。
二、了解一下TensorFlow多線程queue的使用過程
tensorflow: 多線程是真正的多線程執行。 隊列: tf.FIFOQueue(<capacity>, <dtypes>, <name>), 先進先出 tf.RandomShuffleQueue, 隨機出隊列 多線程: 當數據量很大時,入隊操作從硬盤中讀取,放入內存。主線程需要等待操作完成,才能訓練。 使用多線程,可以達到邊處理,邊訓練的異步效果。 隊列管理器(棄用): tf.train.QueueRunner(<queue>, <enqueue_ops>) enqueue_ops: 添加線程的隊列操作列表[]*2為開啟2個線程,[]內為操作部分 method: create_threads(<sess>, <coord>, <start>): 創建線程來運行給定的入隊操作。 start: 布爾值,是否啟動線程 coord: 線程協調器 return: 線程實例 線程協調器: 協調線程之間終止
注意:這里使用的是TensorFlow1.0版本,在后續的版本中基本 把這種方式廢棄了。但是這里為了好的了解文件讀取的方式,我們使用queue式的多線程來執行。
import tensorflow as tf def queue_demo(): # 1、聲明隊列 queue = tf.FIFOQueue(3, dtypes=tf.float32) # 2、加入數據 init_queue = queue.enqueue_many([[0.1, 0.2, 0.3]]) # 3、取出數據 data = queue.dequeue() # 4、處理數據 en_queue = queue.enqueue(data + 1) with tf.Session() as sess: # 初始化操作 sess.run(init_queue) # 循環 for i in range(10): sess.run(en_queue) for i in range(queue.size().eval()): print(queue.dequeue().eval())
import tensorflow as tf def queue_thread_demo(): # 1、聲明隊列 queue = tf.FIFOQueue(100, dtypes=tf.float32) # 2、加入數據 for i in range(100): queue.enqueue((i + 1)/100) # 3、操作 data = queue.dequeue() en_queue = queue.enqueue(data + 1) # 3、定義隊列管理器 qr = tf.train.QueueRunner(queue, enqueue_ops=[en_queue] * 2) with tf.Session() as sess: # 開啟線程協調器 coord = tf.train.Coordinator() # 開啟線程 threads = qr.create_threads(sess, coord=coord, start=True) for i in range(100): print(sess.run(queue.dequeue())) # 注:沒有線程協調器,主線程結束,會結束session,導致異常。 coord.request_stop() coord.join(threads)
三、了解基本的數據讀取過程和api
文件io: 1、csv文件讀取一行 2、二進制文件指定bytes 3、圖片文件一張一張 流程: 1、構造一個文件隊列 2、讀取文件內容 3、解碼文件內容 4、批處理 api: 1、文件隊列構造 tf.train.string_input_producer(<string_tensor>, <shuffle=True>) string_tensor: 含有文件名的一階張量 num_epochs: 過幾遍數據,默認無數遍 2、文件閱讀器 tf.TextLineReader、csv文件格式類型 tf.FixedLengthRecordReader(record_bytes)、讀取固定值的二進制文件 tf.TFRecordReader、讀取TfRecords 共同: read(file_queue): 隊列中指定數量 return: Tensors 元組(key:文件名, value默認行內容) 3、文件解碼器: tf.decode_csv(<records>, <record_defaults=None>, <field_delim=None>, <name=None>) 將CSV轉換為張量,與tf.TextLineReader搭配使用 records: tensor型字符串,每一個字符串為CSV中的記錄 record_defaults: 參數決定了所有張量的類型,並設置一個值在輸入字符串中缺少使用默認值 tf.decode_raw(<bytes>, <out_type>, <little_endian=None>, <name=None>) 將字節轉換為一個向量表示,字節為一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取為utf-8格式
在讀取文件之間先了解批處理的作用,主要是講每次讀出來的數據,緩存,然后到達一個批次,統一訓練
管道讀端批處理: tf.train.batch(<tensors>, <batch_size>, <num_threads=1>, <capacity=32>, <name=None>) tensors: 張量列表 tf.train.shuffle_batch(<tensors>, <batch_size>, <capacity>, <min_dequeue>) min_dequeue: 留下隊列里的張量個數,能夠保持隨機打亂
四、csv文件讀取
csv文件讀取: 1、找到文件,構建列表 2、構造文件隊列 3、構造閱讀器,讀取隊列內容 4、解碼內容 5、批處理
import os import tensorflow as tf def csv_io(): # 1、找到文件,加入隊列 file_names = os.listdir("data/csv") file_list = [os.path.join("data/csv", file_name) for file_name in file_names] file_queue = tf.train.string_input_producer(file_list) # 2、讀取一行數據 reader = tf.TextLineReader() key, value = reader.read(file_queue) # 3、解碼csv records = [[-1], [-1]] num1, num2 = tf.decode_csv(value, record_defaults=records) # 4、批處理 num1_batch, num2_batch = tf.train.batch([num1, num2], batch_size=9, num_threads=1, capacity=9) with tf.Session() as sess: # 加入線程協調器 coord = tf.train.Coordinator() # 線程運行 threads = tf.train.start_queue_runners(sess, coord=coord) print(sess.run([num1_batch, num2_batch])) # 子線程回收 coord.request_stop() coord.join(threads)
五、圖片文件讀取
圖片讀取: 每一個樣本必須保證特征數量一樣 特征值:像素值 單通道:灰度值(黑白圖片,像素中只有一個值) 三通道:RGB(每個像素都有3個值) 三要素:長度寬度、通道值 圖像的基本操作: 目的: 1、增加圖片數據的統一性 2、所有圖片裝換成指定大小 3、縮小圖片數據量,防止增加開銷 操作: 縮小圖片大小 api: 圖片縮放: tf.image.resize_images(<images>, <size>) <images>:4-D形狀[batch, height, width, channels]/3-D[height, width, channels] <size>:1-D int32張量:new_height, new_width, 圖像的新尺寸 return:4-D/3-D格式圖片 圖片讀取api: tf.WholeFileReader: 將文件的全部內容作為輸入的讀取器 return:讀取器實例 read(<file_queue>):輸出將一個文件名(key)和該文件的內容值 圖像解碼器: tf.image.decode_jpeg(<contents>): 將JPEG編碼的圖像解碼為unit8張量 return:uint8張量,3-D形狀[height, width, channels] tf.image.decode_png(): 將PNG編碼的圖像解碼為uint8/uint16的張量 return:張量類型,3-D[height, width, channels]
import os import tensorflow as tf def image_io(): # 1、讀取文件放入隊列 image_names = os.listdir("data/image") image_files = [os.path.join("data/image", image_name) for image_name in image_names] image_queue = tf.train.string_input_producer(image_files) # 2、讀取一張圖片數據 reader = tf.WholeFileReader() # value:一整張圖片的數據 key, value = reader.read(image_queue) # 3、解碼 image = tf.image.decode_jpeg(value) print(image) # 4、處理圖片的大小 new_image = tf.image.resize_images(image, [350, 350]) print(new_image) # 注意一定要固定形狀,批處理的時候所有數據必須固定 new_image.set_shape([350, 350, 3]) print(new_image) # 5、批處理 image_batch = tf.train.batch([new_image], batch_size=2, num_threads=1, capacity=2) # 6、運行 with tf.Session() as sess: # 加入線程協調器 coord = tf.train.Coordinator() # 線程運行 threads = tf.train.start_queue_runners(sess, coord=coord) print(sess.run([image_batch])) # 子線程回收 coord.request_stop() coord.join(threads)
六、二進制文件讀取
二進制文件讀取: api: tf.FixedLengthRecordReader(<record_bytes>) record_bytes:數據長度 解碼器: tf.decode_raw(<bytes>, <out_type>, <little_endian=None>, <name=None>) bytes:數據 out_type:輸出類型
import os import tensorflow as tf def cifar_io(): # 1、讀取文件加入隊列 cifar_names = os.listdir("data/cifar") cifar_files = [os.path.join("data/cifar", cifar_name) for cifar_name in cifar_names if cifar_name.endswith(".bin") and cifar_name != "test_batch.bin"] file_queue = tf.train.string_input_producer(cifar_files) # 2、讀取二進制文件 reader = tf.FixedLengthRecordReader(record_bytes=(32 * 32 * 3 + 1)) key, value = reader.read(file_queue) # 3、解碼數據(二進制數據) # 樣本數據集根據具體數據處理,這里的數據為第一個數據為目標值,后面的為圖片數據 target_image = tf.decode_raw(value, tf.uint8) # 4、分割數據 target = tf.slice(target_image, [0], [1]) image = tf.slice(target_image, [1], [32 * 32 * 3]) # 5、特征數據形狀改變 new_image = tf.reshape(image, [32, 32, 3]) print(new_image) # 6、批處理 image_batch, target_batch = tf.train.batch([new_image, target], batch_size=10, capacity=10) print(image_batch, target_batch) # 7、運行 with tf.Session() as sess: # 線程協調器 coord = tf.train.Coordinator() # 線程運行 threads = tf.train.start_queue_runners(sess, coord=coord) print(sess.run([image_batch, target_batch])) # 子線程回收 coord.request_stop() coord.join(threads)
七、上面說完了,常用文件讀取的方式,下面說一下TensorFlow文件的存儲與讀取的方式。TensorFlow一般采用*.threcords文件格式進行保存。它是一種內置文件格式,是一種二進制文件,它可以更好的利用內存,更方便的復制和移動。
tf.TFRecordReader 一種內置文件格式,是一種二進制文件,它可以更好的利用內存,更方便的復制和移動 為了將二進制數據和標簽(訓練類別標簽),數據存儲在同一文件中 分析、存取 文件格式:*.threcords 寫入文件內容:example協議塊 TF存儲: TFRecord存儲器 tf.python_io.TFRecordWriter(<path>) method: write(record) close Example協議塊: tf.train.Example(<features=None>) features:tf.train.Features(<feature=None>)實例 feature:字典數據,key為要保存的數據 tf.train.Feature(<**options>) **options: tf.train.ByteList(<value=[Bytes]>) tf.train.IntList(<value=[Value]>) tf.train.FloatList(<value=[Value]>) return:Features實例 return:Example協議塊 TF讀取: tf.parse_example(<serialized>, <features=None>, <name=None>) serialized:標量字符串Tensor,一個序列化的Example features:dict字典數據,鍵為讀取的名字,值為FixedLenFeature return:一個鍵值對組成的字典,鍵為讀取的名字 tf.FixedLenFeature(<shape>, <dtype>) shape:形狀 dtype:數據類型(float32/int64/string)
import os import tensorflow as tf def tf_records_io(): # 1、讀取文件加入隊列 cifar_names = os.listdir("data/cifar") cifar_files = [os.path.join("data/cifar", cifar_name) for cifar_name in cifar_names if cifar_name.endswith(".bin") and cifar_name != "test_batch.bin"] file_queue = tf.train.string_input_producer(cifar_files) # 2、讀取二進制文件 reader = tf.FixedLengthRecordReader(record_bytes=(32 * 32 * 3 + 1)) key, value = reader.read(file_queue) # 3、解碼數據(二進制數據) # 樣本數據集根據具體數據處理,這里的數據為第一個數據為目標值,后面的為圖片數據 target_image = tf.decode_raw(value, tf.uint8) # 4、分割數據 target = tf.slice(target_image, [0], [1]) image = tf.slice(target_image, [1], [32 * 32 * 3]) # 5、特征數據形狀改變 new_image = tf.reshape(image, [32, 32, 3]) print(new_image) # 6、批處理 image_batch, target_batch = tf.train.batch([new_image, target], batch_size=10, capacity=10) print(image_batch, target_batch) # 7、tf文件寫入 with tf.Session() as sess: if not os.path.exists("data/tf_records/cifar.tfrecords"): # 1)存進tfRecords文件 print("開始存儲") with tf.python_io.TFRecordWriter(path="data/tf_records/cifar.tfrecords") as writer: # 2)循環次數為批次數 for i in range(10): # 獲取對應值 image_data = image_batch[i].eval().tostring() target_data = int(target_batch[i].eval()[0]) # 3)產生實例 example = tf.train.Example(features=tf.train.Features(feature={ "image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_data])), "target": tf.train.Feature(int64_list=tf.train.Int64List(value=[target_data])) })) # 4)寫入數據 writer.write(example.SerializeToString()) print("結束存儲") # 8、tf文件讀取 # 1)讀取tfRecords文件 tf_queue = tf.train.string_input_producer(["data/tf_records/cifar.tfrecords"]) # 2)讀取數據 tf_reader = tf.TFRecordReader() key, value = tf_reader.read(tf_queue) # 3)解析example features = tf.parse_single_example(value, features={ "image": tf.FixedLenFeature([], dtype=tf.string), "target": tf.FixedLenFeature([], dtype=tf.int64) }) print(features["image"], features["target"]) # 4)解碼數據 image = tf.decode_raw(features["image"], tf.uint8) image_reshape = tf.reshape(image, [32, 32, 3]) target = tf.cast(features["target"], tf.int32) print(image_reshape, target) # 5)批處理 image_batch, target_batch = tf.train.batch([image_reshape, target], batch_size=10, capacity=10) # 9、運行 with tf.Session() as sess: # 線程協調器 coord = tf.train.Coordinator() # 線程運行 threads = tf.train.start_queue_runners(sess, coord=coord) # tf文件讀取 print(sess.run([image_batch, target_batch])) # 子線程回收 coord.request_stop() coord.join(threads)
八、總結,說起來文件讀取只是讀取各種數據樣本的開始,這里的幾種讀取方式基本上就是常用的幾種形式了。目的是認識常規數據讀取的方式。
但是這里要說明:現在處理數據的方式一般采用tf.data的api來進行數據的處理和調整。所以需要把精力放在tf.data上面。