「導語」 TFRecord 是 TensorFlow 生態中的一個重要組件,它是一種二進制序列的存儲格式,使用該格式可以使輸入數據的讀取和處理更為高效,從而提升整體訓練流程的速度,另外,它還具有極高的靈活性,可以為復雜特征數據的構建與解析提供便利。本文將對 TFRecord 數據文件的生成與讀取流程進行詳細地介紹,並提供相應的示例代碼作為參考。
TFRecord 格式簡介
TFRecord 是 TensorFlow 生態中一個重要的組件,其本質上是一種文件格式,用於存儲二進制序列的內容。 TFRecord 文件由序列化后的 protobuf 數據構成,可以直接供 TensorFlow 程序讀取並用於模型訓練。
對於文本格式的數據文件而言,其存儲和讀取的開銷都比較大,相比之下, TFRecord 格式的數據文件占用的磁盤空間會更少,而且可以更高效地進行數據讀取,因此將數據使用 TFRecord 格式存儲能夠在一定程度上提升數據處理的效率。
另外,文本格式的數據更適合處理定長且維度單一的特征數據,對於變長以及多維度的特征數據的處理會比較麻煩,而 TFRecord 格式的數據則沒有這一限制,這給數據讀取與處理帶來了極大的靈活性。
TFRecord ProtoBuf
在生成或者讀取 TFRecord 文件之前,我們需要對與 TFRecord 格式相關的 protobuf 協議有一定的了解,它們是構成 TFRecord 文件的關鍵部分。
Example
Example 指一個數據輸入樣本,它由一系列的特征組成。其 protobuf 格式如下所示:
message Example { Features features = 1; } 復制代碼
上面的 Features 即指特征組合,其 protobuf 格式如下所示:
message Features { // Map from feature name to feature. map<string, Feature> feature = 1; } 復制代碼
這里 Features 使用 map 結構來進行存儲,其中 map 的 key 表示特征的名稱,為 string 類型, value 表示具體的特征值,為 Feature 類型。
另外,可以看到 Example 只是封裝了一下 Features 的結構,其實質與 Features 是等同的。
Feature
Feature 指具體的特征值,其 protobuf 格式如下所示:
message Feature { // Each feature can be exactly one kind. oneof kind { BytesList bytes_list = 1; FloatList float_list = 2; Int64List int64_list = 3; } } 復制代碼
從 Feature 的定義可以看出,它可以接收三種格式的數據,分別為:
-
BytesList格式,可以表示string或byte類型的數據。 -
FloatList格式,可以表示float和double類型的數據。 -
Int64List格式,可以表示bool類型、enum類型、int32類型、uint32類型、int64類型以及uint64等多種類型的數據。
以上三種格式,基本囊括了所有常見的數據輸入類型,在生成和解析 TFRecord 數據時,可以根據具體的數據類型來使用相應的 Feature 結構。
Python 實現
在將上述 protobuf 文件轉為語言相關的數據結構后,即可使用這些結構來構建特征數據並進行序列化了。下面以 python 語言為例,來介紹這些數據結構的具體使用方法。(注:在 TensorFlow 的 python 安裝包里已經包含了 Example 以及 Feature 等相關的數據結構,我們可以直接使用而無需再用 protoc 工具生成了。)
首先,我們需要構建 Feature 對象。因為 Feature 結構支持 3 種格式的數據,所以這里使用 3 個函數來分別生成不同類型的 Feature 對象。示例代碼如下所示:
import tensorflow as tf import numpy as np def _bytes_feature(value): """Returns a bytes_list from a string / byte.""" if isinstance(value, type(tf.constant(0))): # BytesList won't unpack a string from an EagerTensor. value = value.numpy() return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) def _float_feature(value): """Returns a float_list from a float / double.""" return tf.train.Feature(float_list=tf.train.FloatList(value=[value])) def _int64_feature(value): """Returns an int64_list from a bool / enum / int / uint.""" return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) # Print BytesList. print(_bytes_feature(b'test_string')) print(_bytes_feature(u'test_bytes'.encode('utf-8'))) # Print FloatList. print(_float_feature(np.exp(1))) # Print Int64List. print(_int64_feature(True)) print(_int64_feature(1)) # Serialize and Deserialize Feature. serialized_feature = _float_feature(np.exp(1)).SerializeToString() print(serialized_feature) feature_proto = tf.train.Feature.FromString(serialized_feature) print(feature_proto) 復制代碼
其中 _bytes_feature 、 _float_feature 和 _int64_feature 函數分別用來生成 BytesList 、 FloatList 和 Int64List 格式的 Feature 對象,這里假設這 3 個函數的參數 value 均為單個值。
由於 tf.train.*List 函數接收的參數均為列表 (list) 或數組 (array),所以上面的代碼中才會加上 [] 符號來表示列表,如果輸入的參數 value 已經為列表或數組則無需此操作。另外還需注意傳遞給 3 個函數的參數 value 的基礎類型要與具體的 *List 相匹配,否則會報錯。
在生成了 Feature 對象后,可以調用其 SerializeToString 方法對其進行序列化,從而生成序列化后的字符串。另外還可以使用 tf.train.Feature.FromString 方法來將序列化后的數據還原為 Feature 對象。
接着,我們就可以構建 Example 對象並對其進行序列化了。假設我們有 4 種 Feature ,它們分別為 boolean 類型的特征、 integer 類型的特征、 string 類型的特征和 float 類型的特征,我們首先將這 4 種特征通過上面 3 個函數編碼后返回相應的 Feature 對象,然后構建一個 Feature Map 字典並生成 Features 對象,最后使用 Features 對象生成 Example 對象並進行序列化。將上述流程統一到一個函數 serialize_example 中,其示例代碼如下所示:
def serialize_example(feature0, feature1, feature2, feature3): """ Creates a tf.train.Example message ready to be written to a file. """ # Create a dictionary mapping the feature name to the tf.train.Example-compatible feature = { 'feature0': _int64_feature(feature0), 'feature1': _int64_feature(feature1), 'feature2': _bytes_feature(feature2), 'feature3': _float_feature(feature3), } # Create a Features message using tf.train.Example. example_proto = tf.train.Example(features=tf.train.Features( feature=feature)) return example_proto.SerializeToString() # Serialize and Deserialize Example. serialized_example = serialize_example(False, 4, b'goat', 0.9876) print(serialized_example) example_proto = tf.train.Example.FromString(serialized_example) print(example_proto) 復制代碼
同樣地,可以調用 Example 對象的 SerializeToString 來將其序列化為字符串,調用 tf.train.Example.FromString 方法來將序列化后的 Example 對象還原。
TFRecord 文件生成
假設我們有 4 種類型的 Feature ,如上一節所述,並假設它們的原始數據 (numpy) 生成方式如下述代碼所示:
# The number of observations in the dataset. n_observations = int(1e4) # Boolean feature, encoded as False or True. feature0 = np.random.choice([False, True], n_observations) # Integer feature, random from 0 to 4. feature1 = np.random.randint(0, 5, n_observations) # String feature. strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat']) feature2 = strings[feature1] # Float feature, from a standard normal distribution. feature3 = np.random.randn(n_observations) print(feature0, feature1, feature2, feature3) 復制代碼
現在我們要使用這 4 種 Feature 來生成一個包含 10,000 個數據樣本的 TFRecord 文件,可以使用以下幾種方式進行生成。
使用 tf.data 生成
首先使用 tf.data.Dataset.from_tensor_slices 函數來生成一個包含原始數據類型的 dateset ,代碼如下所示:
features_dataset = tf.data.Dataset.from_tensor_slices(
(feature0, feature1, feature2, feature3))
# Print dataset. print(features_dataset) # Print one element in dataset. for f0, f1, f2, f3 in features_dataset.take(1): print(f0, f1, f2, f3) 復制代碼
接着我們使用上一節定義的 serialize_example 函數來生成一個包含序列化字符串類型的 dataset ,代碼如下所示:
def generator(): for features in features_dataset: yield serialize_example(*features) serialized_features_dataset = tf.data.Dataset.from_generator( generator, output_types=tf.string, output_shapes=(), ) # Print serialized dataset. print(serialized_features_dataset) # Print one element in serialized_features_dataset. for s in serialized_features_dataset.take(1): print(s) 復制代碼
最后我們將序列化后的 dataset 寫入 TFRecord 文件中,代碼如下所示:
filename = 'train.tfrecord' writer = tf.data.experimental.TFRecordWriter(filename) writer.write(serialized_features_dataset) 復制代碼
注意這里的 writer 使用的是 tf.data.experimental.TFRecordWriter 對象,它專用於將序列化的 dataset 對象寫入到 TFRecord 文件中,要與后面介紹的 tf.io.TFRecordWriter 對象區分開來。
使用 tf.io 生成
首先將每個數據樣本都轉為 tf.train.Example 對象並序列化,然后再將其寫入 TFRecord 文件中,這里同樣使用上面介紹過的 serialize_example 函數來進行序列化,代碼如下所示:
# Write the `tf.train.Example` observations to the file. with tf.io.TFRecordWriter(filename) as writer: for i in range(n_observations): example = serialize_example( feature0[i], feature1[i], feature2[i], feature3[i], ) writer.write(example) 復制代碼
這里的 writer 使用的是 tf.io.TFRecordWriter 對象,它直接將序列化的字符串寫入到 TFRecord 文件中。
一般情況下,這種生成 TFRecord 文件的方式在 python 中是最常使用的,在實際使用中可以根據具體情況進行選擇。
使用 MapReduce 生成
在數據處理環節,我們可能會使用 MapReduce 進行一些預處理操作,同時我們也希望可以直接借助 MapReduce 任務來生成多個 TFRecord 數據文件以供分布式訓練使用,為了滿足這一需求, TensorFlow 生態提供了一個擴展庫 tensorflow-hadoop ,它包含了 TFRecord 格式的 MapReduce InputFormat 和 OutputFormat 實現。利用這個擴展庫,我們就可以直接使用 MapReduce 任務來生成和讀取 TFRecord 文件了。部分示例代碼如下所示:
// Main function. import org.tensorflow.hadoop.io.TFRecordFileOutputFormat; Job job = Job.getInstance(config, "TFRecord"); job.setOutputFormatClass(TFRecordFileOutputFormat.class); // Mapper or Reducer. import java.util.Arrays; import com.google.protobuf.ByteString; import org.tensorflow.example.BytesList; import org.tensorflow.example.Example; import org.tensorflow.example.Feature; import org.tensorflow.example.Features; import org.tensorflow.example.FloatList; import org.tensorflow.example.Int64List; // map or reduce function // *List value. Int64List value0 = Int64List.newBuilder().addAllValue(Arrays.asList(0L)).build(); Int64List value1 = Int64List.newBuilder().addAllValue(Arrays.asList(4L)).build(); BytesList value2 = BytesList.newBuilder() .addAllValue(Arrays.asList(ByteString.copyFrom("goat".getBytes()))).build(); FloatList value3 = FloatList.newBuilder().addAllValue(Arrays.asList(0.9876f)).build(); // All features. Feature feature0 = Feature.newBuilder().setInt64List(value0).build(); Feature feature1 = Feature.newBuilder().setInt64List(value1).build(); Feature feature2 = Feature.newBuilder().setBytesList(value2).build(); Feature feature3 = Feature.newBuilder().setFloatList(value3).build(); // Feature map. Features feature = Features.newBuilder().putFeature("feature0", feature0) .putFeature("feature1", feature1).putFeature("feature2", feature2) .putFeature("feature3", feature3).build(); // Example. Example example = Example.newBuilder().setFeatures(feature).build(); // Write to TFRecord file. context.write(new BytesWritable(example.toByteArray()), NullWritable.get()); 復制代碼
需要注意的是,為了匹配正在使用的 hadoop 版本,你可能需要修改 tensorflow-hadoop 源碼中的 pom.xml 文件,將 hadoop.version 設置為你正在使用的 hadoop 版本並使用 maven 工具重新編譯該項目,然后將生成的 jar 包引入到 MapReduce 項目中,避免因版本不匹配而報錯。
另外,為了使 MapReduce 項目能正常編譯,你還需引入 org.tensorflow:proto 庫以及 com.google.protobuf:protobuf-java 庫,可以從 maven 官方倉庫搜索這 2 個庫的最新版本並加入到 gradle 或 maven 項目的配置文件中,然后再進行項目編譯即可。
使用 TFRecorder 生成
由於在生成 TFRecord 文件時往往需要編寫大量的復雜代碼,為了優化代碼的復雜度, TensorFlow 官方開源了 TensorFlow Recorder 項目(即 TFRecorder)來更為便捷地生成 TFRecord 文件。
TFRecorder 允許用戶從 Pandas dataframe 或 CSV 直接生成 TFRecords 文件,而無需編寫任何復雜的代碼。其對於圖像數據的處理尤其方便,在 TFRecorder 之前,要大規模生成 TFRecord 格式的圖像數據,必須編寫一個數據流水線來從存儲中加載圖像並將結果序列化為 TFRecord 格式,而現在只需幾行代碼即可生成基於圖像的 TFRecord 文件。示例代碼如下所示:
import pandas as pd import tfrecorder # From Pandas DataFrame df = pd.read_csv('/path/to/data.csv') df.tensorflow.to_tfr(output_dir='/my/output/path') # From CSV tfrecorder.create_tfrecords( source='/path/to/data.csv', output_dir='/my/output/path', ) # From an image directory tfrecorder.create_tfrecords( source='/path/to/image_dir', output_dir='/my/output/path', ) 復制代碼
更多 TFRecorder 的用法請參考其官方文檔。
TFRecord 文件讀取
TensorFlow 提供了專用於讀取 TFRecord 文件的 API 接口 tf.data.TFRecordDataset ,該接口可以將 TFRecord 文件中的內容讀取到 dataset 中。代碼如下所示:
# Read TFRecord file to dataset. raw_dataset = tf.data.TFRecordDataset(filename) print(raw_dataset) 復制代碼
此時 dataset 中存儲的是序列化格式的字符串,如果要將其解析為真實的值,還需要進一步操作。
還原為 Example
我們可以將 raw_dataset 中的每個元素都還原為 tf.train.Example ,一般在小范圍對 TFRecord 數據進行檢查時使用,示例代碼如下所示:
for raw_record in raw_dataset.take(1): example = tf.train.Example() example.ParseFromString(raw_record.numpy()) print(example) # or example_proto = tf.train.Example.FromString(raw_record.numpy()) print(example_proto) 復制代碼
模型訓練使用
為了在模型訓練時使用該 dataset ,我們需要將 raw_dataset 中的每個元素都解析為 FeatureMap ,以匹配 Keras 模型的輸入與輸出。代碼如下所示:
def _parse_function(example_proto): # Create a description of the features. feature_description = { 'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''), 'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0), } # Parse the single input `tf.train.Example` proto using the dictionary above. # return tf.io.parse_single_example(example_proto, feature_description) # Parse the batch input tf.train.Example protos using the dictionary above. return tf.io.parse_example(example_proto, feature_description) # Print parsed dataset. parsed_dataset = raw_dataset.map(_parse_function) print(parsed_dataset) # Print one element in parsed_dataset. for parsed_record in parsed_dataset.take(1): print(parsed_record) 復制代碼
這里使用了 tf.io.parse_example 函數來將序列化后字符串解析為指定的數據類型,我們需要提前准備好 feature_description 字典, 該字典定義了 feature 的名稱、長度(定長/變長)、數據類型以及默認值,以供在解析時使用。最終我們通過調用 raw_dataset 的 map 方法來將該解析函數應用到 dataset 中的每個元素。
另外,我們也可以使用 tf.io.parse_single_example 函數來進行解析,但要注意它與 tf.io.parse_example 的區別,前者適合解析單個序列化的元素,而后者適用於一個 batch 的解析。在TensorFlow 數據輸入的最佳實踐一文中曾介紹過 dataset 的向量化 map 操作,即對 dataset 先應用 batch 轉換然后再應用 map 轉換以提升效率,因此這里推薦使用后者作為序列化數據的解析函數。
最終我們可以將 parsed_dataset 應用於模型的訓練中。示例代碼如下所示:
model.fit(parsed_dataset)
復制代碼
參考資料
作者:AlexanderJLiu
鏈接:https://juejin.im/post/6891461642287054856
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
