TensorFlow 數據輸入格式之 TFRecord


導語」 TFRecord 是 TensorFlow 生態中的一個重要組件,它是一種二進制序列的存儲格式,使用該格式可以使輸入數據的讀取和處理更為高效,從而提升整體訓練流程的速度,另外,它還具有極高的靈活性,可以為復雜特征數據的構建與解析提供便利。本文將對 TFRecord 數據文件的生成與讀取流程進行詳細地介紹,並提供相應的示例代碼作為參考。

TFRecord 格式簡介

TFRecordTensorFlow 生態中一個重要的組件,其本質上是一種文件格式,用於存儲二進制序列的內容。 TFRecord 文件由序列化后的 protobuf 數據構成,可以直接供 TensorFlow 程序讀取並用於模型訓練。

對於文本格式的數據文件而言,其存儲和讀取的開銷都比較大,相比之下, TFRecord 格式的數據文件占用的磁盤空間會更少,而且可以更高效地進行數據讀取,因此將數據使用 TFRecord 格式存儲能夠在一定程度上提升數據處理的效率。

另外,文本格式的數據更適合處理定長且維度單一的特征數據,對於變長以及多維度的特征數據的處理會比較麻煩,而 TFRecord 格式的數據則沒有這一限制,這給數據讀取與處理帶來了極大的靈活性。

TFRecord ProtoBuf

在生成或者讀取 TFRecord 文件之前,我們需要對與 TFRecord 格式相關的 protobuf 協議有一定的了解,它們是構成 TFRecord 文件的關鍵部分。

Example

Example 指一個數據輸入樣本,它由一系列的特征組成。其 protobuf 格式如下所示:

message Example { Features features = 1; } 復制代碼

上面的 Features 即指特征組合,其 protobuf 格式如下所示:

message Features { // Map from feature name to feature. map<string, Feature> feature = 1; } 復制代碼

這里 Features 使用 map 結構來進行存儲,其中 mapkey 表示特征的名稱,為 string 類型, value 表示具體的特征值,為 Feature 類型。

另外,可以看到 Example 只是封裝了一下 Features 的結構,其實質與 Features 是等同的。

Feature

Feature 指具體的特征值,其 protobuf 格式如下所示:

message Feature { // Each feature can be exactly one kind. oneof kind { BytesList bytes_list = 1; FloatList float_list = 2; Int64List int64_list = 3; } } 復制代碼

Feature 的定義可以看出,它可以接收三種格式的數據,分別為:

  1. BytesList 格式,可以表示 stringbyte 類型的數據。

  2. FloatList 格式,可以表示 floatdouble 類型的數據。

  3. Int64List 格式,可以表示 bool 類型、 enum 類型、 int32 類型、 uint32 類型、 int64 類型以及 uint64 等多種類型的數據。

以上三種格式,基本囊括了所有常見的數據輸入類型,在生成和解析 TFRecord 數據時,可以根據具體的數據類型來使用相應的 Feature 結構。

Python 實現

在將上述 protobuf 文件轉為語言相關的數據結構后,即可使用這些結構來構建特征數據並進行序列化了。下面以 python 語言為例,來介紹這些數據結構的具體使用方法。(注:在 TensorFlowpython 安裝包里已經包含了 Example 以及 Feature 等相關的數據結構,我們可以直接使用而無需再用 protoc 工具生成了。)

首先,我們需要構建 Feature 對象。因為 Feature 結構支持 3 種格式的數據,所以這里使用 3 個函數來分別生成不同類型的 Feature 對象。示例代碼如下所示:

import tensorflow as tf import numpy as np def _bytes_feature(value): """Returns a bytes_list from a string / byte.""" if isinstance(value, type(tf.constant(0))): # BytesList won't unpack a string from an EagerTensor. value = value.numpy() return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) def _float_feature(value): """Returns a float_list from a float / double.""" return tf.train.Feature(float_list=tf.train.FloatList(value=[value])) def _int64_feature(value): """Returns an int64_list from a bool / enum / int / uint.""" return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) # Print BytesList. print(_bytes_feature(b'test_string')) print(_bytes_feature(u'test_bytes'.encode('utf-8'))) # Print FloatList. print(_float_feature(np.exp(1))) # Print Int64List. print(_int64_feature(True)) print(_int64_feature(1)) # Serialize and Deserialize Feature. serialized_feature = _float_feature(np.exp(1)).SerializeToString() print(serialized_feature) feature_proto = tf.train.Feature.FromString(serialized_feature) print(feature_proto) 復制代碼

其中 _bytes_feature_float_feature_int64_feature 函數分別用來生成 BytesListFloatListInt64List 格式的 Feature 對象,這里假設這 3 個函數的參數 value 均為單個值。

由於 tf.train.*List 函數接收的參數均為列表 (list) 或數組 (array),所以上面的代碼中才會加上 [] 符號來表示列表,如果輸入的參數 value 已經為列表或數組則無需此操作。另外還需注意傳遞給 3 個函數的參數 value 的基礎類型要與具體的 *List 相匹配,否則會報錯。

在生成了 Feature 對象后,可以調用其 SerializeToString 方法對其進行序列化,從而生成序列化后的字符串。另外還可以使用 tf.train.Feature.FromString 方法來將序列化后的數據還原為 Feature 對象。

接着,我們就可以構建 Example 對象並對其進行序列化了。假設我們有 4Feature ,它們分別為 boolean 類型的特征、 integer 類型的特征、 string 類型的特征和 float 類型的特征,我們首先將這 4 種特征通過上面 3 個函數編碼后返回相應的 Feature 對象,然后構建一個 Feature Map 字典並生成 Features 對象,最后使用 Features 對象生成 Example 對象並進行序列化。將上述流程統一到一個函數 serialize_example 中,其示例代碼如下所示:

def serialize_example(feature0, feature1, feature2, feature3): """ Creates a tf.train.Example message ready to be written to a file. """ # Create a dictionary mapping the feature name to the tf.train.Example-compatible feature = { 'feature0': _int64_feature(feature0), 'feature1': _int64_feature(feature1), 'feature2': _bytes_feature(feature2), 'feature3': _float_feature(feature3), } # Create a Features message using tf.train.Example. example_proto = tf.train.Example(features=tf.train.Features( feature=feature)) return example_proto.SerializeToString() # Serialize and Deserialize Example. serialized_example = serialize_example(False, 4, b'goat', 0.9876) print(serialized_example) example_proto = tf.train.Example.FromString(serialized_example) print(example_proto) 復制代碼

同樣地,可以調用 Example 對象的 SerializeToString 來將其序列化為字符串,調用 tf.train.Example.FromString 方法來將序列化后的 Example 對象還原。

TFRecord 文件生成

假設我們有 4 種類型的 Feature ,如上一節所述,並假設它們的原始數據 (numpy) 生成方式如下述代碼所示:

# The number of observations in the dataset. n_observations = int(1e4) # Boolean feature, encoded as False or True. feature0 = np.random.choice([False, True], n_observations) # Integer feature, random from 0 to 4. feature1 = np.random.randint(0, 5, n_observations) # String feature. strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat']) feature2 = strings[feature1] # Float feature, from a standard normal distribution. feature3 = np.random.randn(n_observations) print(feature0, feature1, feature2, feature3) 復制代碼

現在我們要使用這 4Feature 來生成一個包含 10,000 個數據樣本的 TFRecord 文件,可以使用以下幾種方式進行生成。

使用 tf.data 生成

首先使用 tf.data.Dataset.from_tensor_slices 函數來生成一個包含原始數據類型的 dateset ,代碼如下所示:

features_dataset = tf.data.Dataset.from_tensor_slices(
    (feature0, feature1, feature2, feature3))
# Print dataset. print(features_dataset) # Print one element in dataset. for f0, f1, f2, f3 in features_dataset.take(1): print(f0, f1, f2, f3) 復制代碼

接着我們使用上一節定義的 serialize_example 函數來生成一個包含序列化字符串類型的 dataset ,代碼如下所示:

def generator(): for features in features_dataset: yield serialize_example(*features) serialized_features_dataset = tf.data.Dataset.from_generator( generator, output_types=tf.string, output_shapes=(), ) # Print serialized dataset. print(serialized_features_dataset) # Print one element in serialized_features_dataset. for s in serialized_features_dataset.take(1): print(s) 復制代碼

最后我們將序列化后的 dataset 寫入 TFRecord 文件中,代碼如下所示:

filename = 'train.tfrecord' writer = tf.data.experimental.TFRecordWriter(filename) writer.write(serialized_features_dataset) 復制代碼

注意這里的 writer 使用的是 tf.data.experimental.TFRecordWriter 對象,它專用於將序列化的 dataset 對象寫入到 TFRecord 文件中,要與后面介紹的 tf.io.TFRecordWriter 對象區分開來。

使用 tf.io 生成

首先將每個數據樣本都轉為 tf.train.Example 對象並序列化,然后再將其寫入 TFRecord 文件中,這里同樣使用上面介紹過的 serialize_example 函數來進行序列化,代碼如下所示:

# Write the `tf.train.Example` observations to the file. with tf.io.TFRecordWriter(filename) as writer: for i in range(n_observations): example = serialize_example( feature0[i], feature1[i], feature2[i], feature3[i], ) writer.write(example) 復制代碼

這里的 writer 使用的是 tf.io.TFRecordWriter 對象,它直接將序列化的字符串寫入到 TFRecord 文件中。

一般情況下,這種生成 TFRecord 文件的方式在 python 中是最常使用的,在實際使用中可以根據具體情況進行選擇。

使用 MapReduce 生成

在數據處理環節,我們可能會使用 MapReduce 進行一些預處理操作,同時我們也希望可以直接借助 MapReduce 任務來生成多個 TFRecord 數據文件以供分布式訓練使用,為了滿足這一需求, TensorFlow 生態提供了一個擴展庫 tensorflow-hadoop ,它包含了 TFRecord 格式的 MapReduce InputFormatOutputFormat 實現。利用這個擴展庫,我們就可以直接使用 MapReduce 任務來生成和讀取 TFRecord 文件了。部分示例代碼如下所示:

// Main function. import org.tensorflow.hadoop.io.TFRecordFileOutputFormat; Job job = Job.getInstance(config, "TFRecord"); job.setOutputFormatClass(TFRecordFileOutputFormat.class); // Mapper or Reducer. import java.util.Arrays; import com.google.protobuf.ByteString; import org.tensorflow.example.BytesList; import org.tensorflow.example.Example; import org.tensorflow.example.Feature; import org.tensorflow.example.Features; import org.tensorflow.example.FloatList; import org.tensorflow.example.Int64List; // map or reduce function // *List value. Int64List value0 = Int64List.newBuilder().addAllValue(Arrays.asList(0L)).build(); Int64List value1 = Int64List.newBuilder().addAllValue(Arrays.asList(4L)).build(); BytesList value2 = BytesList.newBuilder() .addAllValue(Arrays.asList(ByteString.copyFrom("goat".getBytes()))).build(); FloatList value3 = FloatList.newBuilder().addAllValue(Arrays.asList(0.9876f)).build(); // All features. Feature feature0 = Feature.newBuilder().setInt64List(value0).build(); Feature feature1 = Feature.newBuilder().setInt64List(value1).build(); Feature feature2 = Feature.newBuilder().setBytesList(value2).build(); Feature feature3 = Feature.newBuilder().setFloatList(value3).build(); // Feature map. Features feature = Features.newBuilder().putFeature("feature0", feature0) .putFeature("feature1", feature1).putFeature("feature2", feature2) .putFeature("feature3", feature3).build(); // Example. Example example = Example.newBuilder().setFeatures(feature).build(); // Write to TFRecord file. context.write(new BytesWritable(example.toByteArray()), NullWritable.get()); 復制代碼

需要注意的是,為了匹配正在使用的 hadoop 版本,你可能需要修改 tensorflow-hadoop 源碼中的 pom.xml 文件,將 hadoop.version 設置為你正在使用的 hadoop 版本並使用 maven 工具重新編譯該項目,然后將生成的 jar 包引入到 MapReduce 項目中,避免因版本不匹配而報錯。

另外,為了使 MapReduce 項目能正常編譯,你還需引入 org.tensorflow:proto 庫以及 com.google.protobuf:protobuf-java 庫,可以從 maven 官方倉庫搜索這 2 個庫的最新版本並加入到 gradlemaven 項目的配置文件中,然后再進行項目編譯即可。

使用 TFRecorder 生成

由於在生成 TFRecord 文件時往往需要編寫大量的復雜代碼,為了優化代碼的復雜度, TensorFlow 官方開源了 TensorFlow Recorder 項目(即 TFRecorder)來更為便捷地生成 TFRecord 文件。

TFRecorder 允許用戶從 Pandas dataframeCSV 直接生成 TFRecords 文件,而無需編寫任何復雜的代碼。其對於圖像數據的處理尤其方便,在 TFRecorder 之前,要大規模生成 TFRecord 格式的圖像數據,必須編寫一個數據流水線來從存儲中加載圖像並將結果序列化為 TFRecord 格式,而現在只需幾行代碼即可生成基於圖像的 TFRecord 文件。示例代碼如下所示:

import pandas as pd import tfrecorder # From Pandas DataFrame df = pd.read_csv('/path/to/data.csv') df.tensorflow.to_tfr(output_dir='/my/output/path') # From CSV tfrecorder.create_tfrecords( source='/path/to/data.csv', output_dir='/my/output/path', ) # From an image directory tfrecorder.create_tfrecords( source='/path/to/image_dir', output_dir='/my/output/path', ) 復制代碼

更多 TFRecorder 的用法請參考其官方文檔。

TFRecord 文件讀取

TensorFlow 提供了專用於讀取 TFRecord 文件的 API 接口 tf.data.TFRecordDataset ,該接口可以將 TFRecord 文件中的內容讀取到 dataset 中。代碼如下所示:

# Read TFRecord file to dataset. raw_dataset = tf.data.TFRecordDataset(filename) print(raw_dataset) 復制代碼

此時 dataset 中存儲的是序列化格式的字符串,如果要將其解析為真實的值,還需要進一步操作。

還原為 Example

我們可以將 raw_dataset 中的每個元素都還原為 tf.train.Example ,一般在小范圍對 TFRecord 數據進行檢查時使用,示例代碼如下所示:

for raw_record in raw_dataset.take(1): example = tf.train.Example() example.ParseFromString(raw_record.numpy()) print(example) # or example_proto = tf.train.Example.FromString(raw_record.numpy()) print(example_proto) 復制代碼

模型訓練使用

為了在模型訓練時使用該 dataset ,我們需要將 raw_dataset 中的每個元素都解析為 FeatureMap ,以匹配 Keras 模型的輸入與輸出。代碼如下所示:

def _parse_function(example_proto): # Create a description of the features. feature_description = { 'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0), 'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''), 'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0), } # Parse the single input `tf.train.Example` proto using the dictionary above. # return tf.io.parse_single_example(example_proto, feature_description) # Parse the batch input tf.train.Example protos using the dictionary above. return tf.io.parse_example(example_proto, feature_description) # Print parsed dataset. parsed_dataset = raw_dataset.map(_parse_function) print(parsed_dataset) # Print one element in parsed_dataset. for parsed_record in parsed_dataset.take(1): print(parsed_record) 復制代碼

這里使用了 tf.io.parse_example 函數來將序列化后字符串解析為指定的數據類型,我們需要提前准備好 feature_description 字典, 該字典定義了 feature 的名稱、長度(定長/變長)、數據類型以及默認值,以供在解析時使用。最終我們通過調用 raw_datasetmap 方法來將該解析函數應用到 dataset 中的每個元素。

另外,我們也可以使用 tf.io.parse_single_example 函數來進行解析,但要注意它與 tf.io.parse_example 的區別,前者適合解析單個序列化的元素,而后者適用於一個 batch 的解析。在TensorFlow 數據輸入的最佳實踐一文中曾介紹過 dataset 的向量化 map 操作,即對 dataset 先應用 batch 轉換然后再應用 map 轉換以提升效率,因此這里推薦使用后者作為序列化數據的解析函數。

最終我們可以將 parsed_dataset 應用於模型的訓練中。示例代碼如下所示:

model.fit(parsed_dataset)
復制代碼

參考資料

  1. TFRecord and tf.train.Example
  2. TFRecord 相關 ProtoBuf 文件地址
  3. 創建 TFRecords 的救星 — TensorFlow Recorder 現已開源!
  4. TFRecorder Github 地址
  5. Hadoop MapReduce InputFormat/OutputFormat for TFRecords

作者:AlexanderJLiu
鏈接:https://juejin.im/post/6891461642287054856
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM