「導語」 TFRecord 是 TensorFlow 生態中的一個重要組件,它是一種二進制序列的存儲格式,使用該格式可以使輸入數據的讀取和處理更為高效,從而提升整體訓練流程的速度,另外,它還具有極高的靈活性,可以為復雜特征數據的構建與解析提供便利。本文將對 TFRecord 數據文件的生成與讀取流程進行詳細地介紹,並提供相應的示例代碼作為參考。
TFRecord 格式簡介
TFRecord
是 TensorFlow
生態中一個重要的組件,其本質上是一種文件格式,用於存儲二進制序列的內容。 TFRecord
文件由序列化后的 protobuf
數據構成,可以直接供 TensorFlow
程序讀取並用於模型訓練。
對於文本格式的數據文件而言,其存儲和讀取的開銷都比較大,相比之下, TFRecord
格式的數據文件占用的磁盤空間會更少,而且可以更高效地進行數據讀取,因此將數據使用 TFRecord
格式存儲能夠在一定程度上提升數據處理的效率。
另外,文本格式的數據更適合處理定長且維度單一的
特征數據,對於變長以及多維度的
特征數據的處理會比較麻煩,而 TFRecord
格式的數據則沒有這一限制,這給數據讀取與處理帶來了極大的靈活性。
TFRecord ProtoBuf
在生成或者讀取 TFRecord
文件之前,我們需要對與 TFRecord
格式相關的 protobuf
協議有一定的了解,它們是構成 TFRecord
文件的關鍵部分。
Example
Example
指一個數據輸入樣本,它由一系列的特征組成。其 protobuf
格式如下所示:
message Example { Features features = 1; } 復制代碼
上面的 Features
即指特征組合,其 protobuf
格式如下所示:
message Features { // Map from feature name to feature. map<string, Feature> feature = 1; } 復制代碼
這里 Features
使用 map
結構來進行存儲,其中 map
的 key
表示特征的名稱,為 string
類型, value
表示具體的特征值,為 Feature
類型。
另外,可以看到 Example
只是封裝了一下 Features
的結構,其實質與 Features
是等同的。
Feature
Feature
指具體的特征值,其 protobuf
格式如下所示:
message Feature { // Each feature can be exactly one kind. oneof kind { BytesList bytes_list = 1; FloatList float_list = 2; Int64List int64_list = 3; } } 復制代碼
從 Feature
的定義可以看出,它可以接收三種格式的數據,分別為:
-
BytesList
格式,可以表示string
或byte
類型的數據。 -
FloatList
格式,可以表示float
和double
類型的數據。 -
Int64List
格式,可以表示bool
類型、enum
類型、int32
類型、uint32
類型、int64
類型以及uint64
等多種類型的數據。
以上三種格式,基本囊括了所有常見的數據輸入類型,在生成和解析 TFRecord
數據時,可以根據具體的數據類型來使用相應的 Feature
結構。
Python 實現
在將上述 protobuf
文件轉為語言相關的數據結構后,即可使用這些結構來構建特征數據並進行序列化了。下面以 python
語言為例,來介紹這些數據結構的具體使用方法。(注:在 TensorFlow
的 python
安裝包里已經包含了 Example
以及 Feature
等相關的數據結構,我們可以直接使用而無需再用 protoc
工具生成了。)
首先,我們需要構建 Feature
對象。因為 Feature
結構支持 3
種格式的數據,所以這里使用 3
個函數來分別生成不同類型的 Feature
對象。示例代碼如下所示:
import tensorflow as tf import numpy as np def _bytes_feature(value): """Returns a bytes_list from a string / byte.""" if isinstance(value, type(tf.constant(0))): # BytesList won't unpack a string from an EagerTensor. value = value.numpy() return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) def _float_feature(value): """Returns a float_list from a float / double.""" return tf.train.Feature(float_list=tf.train.FloatList(value=[value])) def _int64_feature(value): """Returns an int64_list from a bool / enum / int / uint.""" return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) # Print BytesList. print(_bytes_feature(b'test_string')) print(_bytes_feature(u'test_bytes'.encode('utf-8'))) # Print FloatList. print(_float_feature(np.exp(1))) # Print Int64List. print(_int64_feature(True)) print(_int64_feature(1)) # Serialize and Deserialize Feature. serialized_feature = _float_feature(np.exp(1)).SerializeToString() print(serialized_feature) feature_proto = tf.train.Feature.FromString(serialized_feature) print(feature_proto) 復制代碼
其中 _bytes_feature
、 _float_feature
和 _int64_feature
函數分別用來生成 BytesList
、 FloatList
和 Int64List
格式的 Feature
對象,這里假設這 3
個函數的參數 value
均為單個值。
由於 tf.train.*List
函數接收的參數均為列表 (list
) 或數組 (array
),所以上面的代碼中才會加上 []
符號來表示列表,如果輸入的參數 value
已經為列表或數組則無需此操作。另外還需注意傳遞給 3
個函數的參數 value
的基礎類型要與具體的 *List
相匹配,否則會報錯。
在生成了 Feature
對象后,可以調用其 SerializeToString
方法對其進行序列化,從而生成序列化后的字符串。另外還可以使用 tf.train.Feature.FromString
方法來將序列化后的數據還原為 Feature
對象。
接着,我們就可以構建 Example
對象並對其進行序列化了。假設我們有 4
種 Feature
,它們分別為 boolean
類型的特征、 integer
類型的特征、 string
類型的特征和 float
類型的特征,我們首先將這 4
種特征通過上面 3
個函數編碼后返回相應的 Feature
對象,然后構建一個 Feature Map
字典並生成 Features
對象,最后使用 Features
對象生成 Example
對象並進行序列化。將上述流程統一到一個函數 serialize_example
中,其示例代碼如下所示:
def serialize_example(feature0, feature1, feature2, feature3): """ Creates a tf.train.Example message ready to be written to a file. """ # Create a dictionary mapping the feature name to the tf.train.Example-compatible feature = { 'feature0': _int64_feature(feature0), 'feature1': _int64_feature(feature1), 'feature2': _bytes_feature(feature2), 'feature3': _float_feature(feature3), } # Create a Features message using tf.train.Example. example_proto = tf.train.Example(features=tf.train.Features( feature=feature)) return example_proto.SerializeToString() # Serialize and Deserialize Example. serialized_example = serialize_example(False, 4, b'goat', 0.9876) print(serialized_example) example_proto = tf.train.Example.FromString(serialized_example) print(example_proto) 復制代碼
同樣地,可以調用 Example
對象的 SerializeToString
來將其序列化為字符串,調用 tf.train.Example.FromString
方法來將序列化后的 Example
對象還原。
TFRecord 文件生成
假設我們有 4
種類型的 Feature
,如上一節所述,並假設它們的原始數據 (numpy
) 生成方式如下述代碼所示:
# The number of observations in the dataset. n_observations = int(1e4) # Boolean feature, encoded as False or True. feature0 = np.random.choice([False, True], n_observations) # Integer feature, random from 0 to 4. feature1 = np.random.randint(0, 5, n_observations) # String feature. strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat']) feature2 = strings[feature1] # Float feature, from a standard normal distribution. feature3 = np.random.randn(n_observations) print(feature0, feature1, feature2, feature3) 復制代碼
現在我們要使用這 4
種 Feature
來生成一個包含 10,000
個數據樣本的 TFRecord
文件,可以使用以下幾種方式進行生成。
使用 tf.data 生成
首先使用 tf.data.Dataset.from_tensor_slices
函數來生成一個包含原始數據類型的 dateset
,代碼如下所示:
features_dataset = tf.data.Dataset.from_tensor_slices(
(feature0, feature1, feature2, feature3))
# Print dataset. print(features_dataset) # Print one element in dataset. for f0, f1, f2, f3 in features_dataset.take(1): print(f0, f1, f2, f3) 復制代碼
接着我們使用上一節定義的 serialize_example
函數來生成一個包含序列化字符串類型的 dataset
,代碼如下所示:
def generator(): for features in features_dataset: yield serialize_example(*features) serialized_features_dataset = tf.data.Dataset.from_generator( generator, output_types=tf.string, output_shapes=(), ) # Print serialized dataset. print(serialized_features_dataset) # Print one element in serialized_features_dataset. for s in serialized_features_dataset.take(1): print(s) 復制代碼
最后我們將序列化后的 dataset
寫入 TFRecord
文件中,代碼如下所示:
filename = 'train.tfrecord' writer = tf.data.experimental.TFRecordWriter(filename) writer.write(serialized_features_dataset) 復制代碼
注意這里的 writer
使用的是 tf.data.experimental.TFRecordWriter
對象,它專用於將序列化的 dataset
對象寫入到 TFRecord
文件中,要與后面介紹的 tf.io.TFRecordWriter
對象區分開來。
使用 tf.io 生成
首先將每個數據樣本都轉為 tf.train.Example
對象並序列化,然后再將其寫入 TFRecord
文件中,這里同樣使用上面介紹過的 serialize_example
函數來進行序列化,代碼如下所示:
# Write the `tf.train.Example` observations to the file. with tf.io.TFRecordWriter(filename) as writer: for i in range(n_observations): example = serialize_example( feature0[i], feature1[i], feature2[i], feature3[i], ) writer.write(example) 復制代碼
這里的 writer
使用的是 tf.io.TFRecordWriter
對象,它直接將序列化的字符串寫入到 TFRecord
文件中。
一般情況下,這種生成 TFRecord
文件的方式在 python
中是最常使用的,在實際使用中可以根據具體情況進行選擇。
使用 MapReduce 生成
在數據處理環節,我們可能會使用 MapReduce
進行一些預處理操作,同時我們也希望可以直接借助 MapReduce
任務來生成多個 TFRecord
數據文件以供分布式訓練使用,為了滿足這一需求, TensorFlow
生態提供了一個擴展庫 tensorflow-hadoop
,它包含了 TFRecord
格式的 MapReduce InputFormat
和 OutputFormat
實現。利用這個擴展庫,我們就可以直接使用 MapReduce
任務來生成和讀取 TFRecord
文件了。部分示例代碼如下所示:
// Main function. import org.tensorflow.hadoop.io.TFRecordFileOutputFormat; Job job = Job.getInstance(config, "TFRecord"); job.setOutputFormatClass(TFRecordFileOutputFormat.class); // Mapper or Reducer. import java.util.Arrays; import com.google.protobuf.ByteString; import org.tensorflow.example.BytesList; import org.tensorflow.example.Example; import org.tensorflow.example.Feature; import org.tensorflow.example.Features; import org.tensorflow.example.FloatList; import org.tensorflow.example.Int64List; // map or reduce function // *List value. Int64List value0 = Int64List.newBuilder().addAllValue(Arrays.asList(0L)).build(); Int64List value1 = Int64List.newBuilder().addAllValue(Arrays.asList(4L)).build(); BytesList value2 = BytesList.newBuilder() .addAllValue(Arrays.asList(ByteString.copyFrom("goat".getBytes()))).build(); FloatList value3 = FloatList.newBuilder().addAllValue(Arrays.asList(0.9876f)).build(); // All features. Feature feature0 = Feature.newBuilder().setInt64List(value0).build(); Feature feature1 = Feature.newBuilder().setInt64List(value1).build(); Feature feature2 = Feature.newBuilder().setBytesList(value2).build(); Feature feature3 = Feature.newBuilder().setFloatList(value3).build(); // Feature map. Features feature = Features.newBuilder().putFeature("feature0", feature0) .putFeature("feature1", feature1).putFeature("feature2", feature2) .putFeature("feature3", feature3).build(); // Example. Example example = Example.newBuilder().setFeatures(feature).build(); // Write to TFRecord file. context.write(new BytesWritable(example.toByteArray()), NullWritable.get()); 復制代碼
需要注意的是,為了匹配正在使用的 hadoop
版本,你可能需要修改 tensorflow-hadoop
源碼中的 pom.xml
文件,將 hadoop.version
設置為你正在使用的 hadoop
版本並使用 maven
工具重新編譯該項目,然后將生成的 jar
包引入到 MapReduce
項目中,避免因版本不匹配而報錯。
另外,為了使 MapReduce
項目能正常編譯,你還需引入 org.tensorflow:proto
庫以及 com.google.protobuf:protobuf-java
庫,可以從 maven
官方倉庫搜索這 2
個庫的最新版本並加入到 gradle
或 maven
項目的配置文件中,然后再進行項目編譯即可。
使用 TFRecorder 生成
由於在生成 TFRecord
文件時往往需要編寫大量的復雜代碼,為了優化代碼的復雜度, TensorFlow
官方開源了 TensorFlow Recorder
項目(即 TFRecorder
)來更為便捷地生成 TFRecord
文件。
TFRecorder
允許用戶從 Pandas dataframe
或 CSV
直接生成 TFRecords
文件,而無需編寫任何復雜的代碼。其對於圖像數據的處理尤其方便,在 TFRecorder
之前,要大規模生成 TFRecord
格式的圖像數據,必須編寫一個數據流水線來從存儲中加載圖像並將結果序列化為 TFRecord
格式,而現在只需幾行代碼即可生成基於圖像的 TFRecord
文件。示例代碼如下所示:
import pandas as pd import tfrecorder # From Pandas DataFrame df = pd.read_csv('/path/to/data.csv') df.tensorflow.to_tfr(output_dir='/my/output/path') # From CSV tfrecorder.create_tfrecords( source='/path/to/data.csv', output_dir='/my/output/path', ) # From an image directory tfrecorder.create_tfrecords( source='/path/to/image_dir', output_dir='/my/output/path', ) 復制代碼
更多 TFRecorder
的用法請參考其官方文檔。
TFRecord 文件讀取
TensorFlow
提供了專用於讀取 TFRecord
文件的 API
接口 tf.data.TFRecordDataset
,該接口可以將 TFRecord
文件中的內容讀取到 dataset
中。代碼如下所示:
# Read TFRecord file to dataset. raw_dataset = tf.data.TFRecordDataset(filename) print(raw_dataset) 復制代碼
此時 dataset
中存儲的是序列化格式的字符串,如果要將其解析為真實的值,還需要進一步操作。
還原為 Example
我們可以將 raw_dataset
中的每個元素都還原為 tf.train.Example
,一般在小范圍對 TFRecord
數據進行檢查時使用,示例代碼如下所示:
for raw_record in raw_dataset.take(1): example = tf.train.Example() example.ParseFromString(raw_record.numpy()) print(example) # or example_proto = tf.train.Example.FromString(raw_record.numpy()) print(example_proto) 復制代碼
模型訓練使用
為了在模型訓練時使用該 dataset
,我們需要將 raw_dataset
中的每個元素都解析為 FeatureMap
,以匹配 Keras
模型的輸入與輸出。代碼如下所示:
def _parse_function(example_proto): # Create a description of the features. feature_description = { 'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''), 'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0), } # Parse the single input `tf.train.Example` proto using the dictionary above. # return tf.io.parse_single_example(example_proto, feature_description) # Parse the batch input tf.train.Example protos using the dictionary above. return tf.io.parse_example(example_proto, feature_description) # Print parsed dataset. parsed_dataset = raw_dataset.map(_parse_function) print(parsed_dataset) # Print one element in parsed_dataset. for parsed_record in parsed_dataset.take(1): print(parsed_record) 復制代碼
這里使用了 tf.io.parse_example
函數來將序列化后字符串解析為指定的數據類型,我們需要提前准備好 feature_description
字典, 該字典定義了 feature
的名稱、長度(定長/變長)、數據類型以及默認值,以供在解析時使用。最終我們通過調用 raw_dataset
的 map
方法來將該解析函數應用到 dataset
中的每個元素。
另外,我們也可以使用 tf.io.parse_single_example
函數來進行解析,但要注意它與 tf.io.parse_example
的區別,前者適合解析單個序列化的元素,而后者適用於一個 batch
的解析。在TensorFlow 數據輸入的最佳實踐
一文中曾介紹過 dataset
的向量化 map
操作,即對 dataset
先應用 batch
轉換然后再應用 map
轉換以提升效率,因此這里推薦使用后者作為序列化數據的解析函數。
最終我們可以將 parsed_dataset
應用於模型的訓練中。示例代碼如下所示:
model.fit(parsed_dataset)
復制代碼
參考資料
作者:AlexanderJLiu
鏈接:https://juejin.im/post/6891461642287054856
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。