一、TFRecord數據格式
對於深度學習的物體檢測等任務,比較常用的數據集是PASCAL VOC數據集。對於PASCAL VOC數據集,主要有兩個文件夾,分別為:Annotations和JPEGImages。其中,Annotations文件夾存儲了圖片物體的label,格式為XML格式,而JPEGImages文件夾下存放的是圖片。這種數據集需要將圖像數據和標簽分開放置,比較不友好。
對於使用tensorflow框架進行訓練深度神經網絡時,tensorflow提供了對圖像數據進行處理的工具——TFRecord。其將數據標簽和圖像數據存儲在一起,方便管理。
TFRecord通過tf.train.Example Protocol Buffer協議進行數據存儲的。tf.train.Example的定義如下:
message Example { Features features = 1; }; message Features { map<string, Feature> feature = 1; }; message Feature { oneof kind { BytesList bytes_list = 1; FloatList float_list =2; Int64List int64_list = 3; } };
tf.train.Example包含了一個從屬性到取值的字典。屬性的取值可以為字符串、實數列表和整數列表。
二、圖像預處理
對於圖像預處理,tensorflow提供了幾種方法,主要包括:
1、調整圖像大小
tf.image.resize_images
例如:resized = tf.image.resize_images(img_data, [300, 300], method=0)
tf.image.resize_images函數的method參數對應不同的圖像調整算法,如下圖所示;
method取值 | 圖像大小調整算法 |
0 | 雙線性插值法 |
1 | 最近鄰居法 |
2 | 雙三次插值法 |
3 | 面積插值法 |
不同算法調整圖像大小的結果差不多。除了將整張圖像信息完整保存,tensorflow還支持對圖像進行裁剪或者填充(tf.image.crop_to_bounding_box函數、tf.image.pad_to_bounding_box和tf.image.resize_image_with_crop_or_pad函數)。
2、圖像色彩調整
tf.image.adjust_brightness、tf.image.adjust_contrast、tf.image.adjust_hue、tf.image.adjust_saturation
3、處理標注框
tf.image.draw_bounding_box
4、隨機截取圖像
tf.image.sample_distorted_bounding_box
三、多線程輸入數據處理框架
對於第二節中tensorflow提供的數據預處理方法,雖然可以減小無關因素對圖像識別模型的效果的影響,但是這些預處理方式比較復雜,會減慢整個訓練的過程。因此,為了避免圖像預處理稱為神經網絡模型訓練效率的瓶頸,tensorflow提供了一套多線程處理輸入數據的框架。其主要的流程圖如下:
3.1隊列和多線程
在tensorflow中,隊列和變量類似,都是計算圖上有狀態的節點。其他的計算節點可以修改他們的狀態。對於變量,可以通過賦值操作修改變量的值,而對於隊列而言,修改隊列的狀態的操作有:Enqueue、EnqueueMany和Dequeue。
tensorflow提供了tf.Coordinator和tf.QueueRunner兩個類完成多線程協同的功能。tf.Coordinator主要用於協同多個線程一起停止,並提供了should_stop、request_stop和join三個函數。在啟動線程之前需要先聲明一個tf.Coordinator類,並將這個類傳入每一個創建的線程中。啟動的線程需要一直查詢tf.Coordinator類中提供的should_stop函數,當這個函數的返回值是True時,則當前的線程需要退出。當某一個線程調用request_stop函數之后,should_stop函數的返回值將被設置為True,這樣所有的線程就可以同時終止了。
實例:
import tensorflow as tf import numpy as np import threading # 線程中運行的程序,這個程序每隔1秒判斷是否需要停止並打印自己的ID def MyLoop(coord, worker_id): # 使用tf.Coordinator類提供的協同工具判斷當前進程是否需要停止 while not coord.should_stop(): # 隨機停止所有線程 if np.random.rand() < 0.1: print("Stoping from id: %d\n" % worker_id) coord.request_stop() else: print("working on id: %d\n" %worker_id) # 聲明一個tf.train.Coordinator類來協同多個線程 coord = tf.train.Coordinator() # 創建5個線程 threads = [threading.Thread(target=MyLoop, args=(coord, i)) for i in range(5)] # 啟動所有線程 for t in threads: t.start() # 等待所有線程退出 coord.join()
3.2輸入文件隊列
本節介紹使用tensorflow中的隊列管理輸入文件列表。
為了提高處理效率,當訓練的數據量較大時,通常將數據分成多個TFRecord文件保存。tensorflow提供了獲取符合正則表達式獲取文件的函數:tf.train.match_filenames_once。對於得到的文件列表,可以通過tf.train.string_input_producer函數進行管理。
tf.train.string_input_producer函數會使用初始化時提供的文件列表創建一個輸入隊列,輸入隊列中原始元素為文件列中的所有文件。
實例:
1、將數據轉換成TFRecord格式,寫入文件
import tensorflow as tf # 創建TFRecord文件的幫助函數 def _int64_feature(value): return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) # 寫入數據 num_shards = 2 instances_per_shard = 2 # 寫入數據,文件后綴為:000n-of-000m,m為訓練數據總的被存放在了多少個文件,n為當前的文件編號 # 這種命名方式,方便了使用正則表達式獲取文件列表 for i in range(num_shards): filename = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_shards)) writer = tf.python_io.TFRecordWriter(filename) # 將數據封裝成Example結構並寫入TFRecord文件 for j in range(instances_per_shard): example = tf.train.Example(features=tf.train.Features(feature={ 'i': _int64_feature(i), 'j': _int64_feature(j)})) writer.write(example.SerializeToString()) writer.close()
2、讀取TFRecord數據實例
import tensorflow as tf # 使用tf.train.match_filenames_once函數獲取文件列表 files = tf.train.match_filenames_once("/path/to/data.tfrecords-*") # 通過tf.train.string_input_producer函數創建輸入隊列,輸入隊列的文件列表為tf.train.match_filenames_once函數獲取文件列表 filename_queue = tf.train.string_input_producer(files, shuffle=True) # 解析一個樣本 reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) features = tf.parse_single_example(serialized_example, features={ 'i': tf.FixedLenFeature((), tf.int64), 'j': tf.FixedLenFeature((), tf.int64) }) with tf.Session() as sess: tf.local_variables_initializer().run() print(sess.run(files)) # 聲明tf.train.Coordinator類來協同不同的現場,並且啟動線程 coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) for i in range(6): print(sess.run([features['i'], features['j']])) coord.request_stop() coord.join(threads)
3.3組合訓練數據
通常在訓練神經網絡時,我們一次輸入一個batch的數據量,這樣可以提高訓練效率。所以得到單個樣例的預處理結果之后,還需要將它們組織成batch,然后輸入網絡的輸入層。tensorflow提供了這樣的函數:tf.train.batch以及tf.train.shuffle_batch。
實例:
import tensorflow as tf files = tf.train.match_filenames_once("/path/to/data.tfrecords-*") file_queue = tf.train.string_input_producer(files) # 解析一個樣本 reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) features = tf.parse_single_example(serialized_example, features={ 'i': tf.FixedLenFeature((), tf.int64), 'j': tf.FixedLenFeature((), tf.int64) }) example, label = features['i'], features['j'] batch_size = 3 capacity = 1000 + 3 *batch_size example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size capacity=capacity) with tf.Session() as sess: tf.global_variables_initializer().run() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) for i in range(2): cur_example_batch, cur_label_batch = sess.run( [example_batch, label_batch]) print(cur_example_batch, cur_label_batch) coord.request_stop() coord.join(threads)
其中,調用tf.train.start_queue_runners, 啟動入隊線程,由多個或單個線程,按照設定規則,把文件讀入Filename Queue中。函數返回線程ID的列表,一般情況下,系統有多少個核,就會啟動多少個入隊線程(入隊具體使用多少個線程在tf.train.batch中定義);
(參考:https://blog.csdn.net/dcrmg/article/details/79780331)
3.4高層處理框架slim
除了使用tf.train提供的方法進行獲取批量的訓練TFRecord數據,對於tensorflow的高層封裝tf.slim處理。
通過slim讀取TFRecord數據,主要分為以下幾個步驟:
1、定義解碼器decoder
decoder = tf.slim.tfexample_decoder.TFExampleDecoder()
其中,定義解碼器時,需要制定兩個參數:keys_to_features,和items_to_handlers兩個字典參數。key_to_features這個字典需要和TFrecord文件中定義的字典項匹配。items_to_handlers中的關鍵字可以是任意值,但是它的handler的初始化參數必須要來自於keys_to_features中的關鍵字。
2、定義dataset
dataset= tf.slim.dataset.Dataset()
其中,定義dataset時需要將datasetsource、reader、decoder、num_samples等參數
3、定義provider
provider = slim.dataset_data_provider.DatasetDataProvider
其中,需要的參數為:dataset, num_readers, reader_kwargs, shuffle, num_epochs,common_queue_capacity,common_queue_min, record_key=',seed, scope等。
4、調用provider的get方法
獲取items_to_handlers中定義的關鍵字
5、利用分好的batch建立一個prefetch_queue
6、prefetch_queue中有一個dequeue的op,每執行一次dequeue則返回一個batch的數據。
(參考:
https://www.jianshu.com/p/63eb53cb5b73
https://blog.csdn.net/MOU_IT/article/details/82773839)
實例:
# 構造第一個參數:數據目錄+文件名
file_pattern = os.path.join(dataset_dir, self.param.FILE_PATTERN % train_or_test)
# 准備第二個參數:
reader = tf.TFRecordReader
# 准備第三個參數:decoder
# 反序列化的格式
keys_to_features = {
'image/encoded': tf.FixedLenFeature((), tf.string, default_value=''),
'image/format': tf.FixedLenFeature((), tf.string, default_value='jpeg'),
'image/height': tf.FixedLenFeature([1], tf.int64),
'image/width': tf.FixedLenFeature([1], tf.int64),
'image/channels': tf.FixedLenFeature([1], tf.int64),
'image/shape': tf.FixedLenFeature([3], tf.int64),
'image/object/bbox/xmin': tf.VarLenFeature(dtype=tf.float32),
'image/object/bbox/ymin': tf.VarLenFeature(dtype=tf.float32),
'image/object/bbox/xmax': tf.VarLenFeature(dtype=tf.float32),
'image/object/bbox/ymax': tf.VarLenFeature(dtype=tf.float32),
'image/object/bbox/label': tf.VarLenFeature(dtype=tf.int64),
'image/object/bbox/difficult': tf.VarLenFeature(dtype=tf.int64),
'image/object/bbox/truncated': tf.VarLenFeature(dtype=tf.int64),
}
# 2、反序列化成高級的格式
# 其中bbox框ymin [23] xmin [46],ymax [234] xmax[12]--->[23,46,234,13]
items_to_handlers = {
'image': slim.tfexample_decoder.Image('image/encoded', 'image/format'),
'shape': slim.tfexample_decoder.Tensor('image/shape'),
'object/bbox': slim.tfexample_decoder.BoundingBox(
['ymin', 'xmin', 'ymax', 'xmax'], 'image/object/bbox/'),
'object/label': slim.tfexample_decoder.Tensor('image/object/bbox/label'),
'object/difficult': slim.tfexample_decoder.Tensor('image/object/bbox/difficult'),
'object/truncated': slim.tfexample_decoder.Tensor('image/object/bbox/truncated'),
}
# 構造decoder
decoder = slim.tfexample_decoder.TFExampleDecoder(keys_to_features, items_to_handlers)
dataset = slim.dataset.Dataset(
data_sources=file_pattern,
reader=reader,
decoder=decoder,
num_samples=self.param.SPLITS_TO_SIZES[train_or_test],
items_to_descriptions=self.param.ITEMS_TO_DESCRIPTIONS,
num_classes=self.param.NUM_CLASSES)
provider = slim.dataset_data_provider.DatasetDataProvider(
dataset,
num_readers=4,
common_queue_capacity=20 * FLAGS.batch_size,
common_queue_min=10 * FLAGS.batch_size,
shuffle=True
)
# 通過get獲取數據,獲取到的數據是單個數據,還需要對數據進行預處理,組合數據
# 真正獲取參數
[image, shape, glabels, gbboxes] = provider.get(['image', 'shape', 'object/label', 'object/bbox'])
# 直接進行數據預處理
# image [?, ?, 3]---->[300, 300, 3]
image, glabels, gbboxes = image_preprocessing_fn(image, glabels, gbboxes,
out_shape=ssd_shape,
data_format=DATA_FORMAT)
# 特征值、目標
# 批處理以及隊列處理
# tensor_list:tensor列表 [tensor, tensor, ]
# tf.train.batch(tensor_list, batch_size, num_threads, capacity)
# [Tensor, [6], [6], [6]] 嵌套的列表要轉換成單列表形式
r = tf.train.batch(train_tools.reshape_list([image, gclasses, glocalisations, gscores]),
batch_size=FLAGS.batch_size,
num_threads=4,
capacity=5 * FLAGS.batch_size)
# 批處理數據放入隊列
# 1個r:批處理的樣本, 5個設備,5個r, 5組32張圖片
# 隊列的目的是為了不同設備需求
#最后組裝好的數據出隊列dequeue
batch_queue = slim.prefetch_queue.prefetch_queue(r, capacity=deploy_config.num_clones)