1. Tensorflow高效流水線Pipeline


1. Tensorflow高效流水線Pipeline

2. Tensorflow的數據處理中的Dataset和Iterator

3. Tensorflow生成TFRecord

4. Tensorflow的Estimator實踐原理

1. 前言

GPU和TPU可以顯著縮短執行單個訓練步所需的時間。實現最高性能需要高效的輸入流水線,以在當前時間步完成之前為下一步提供數據。tf.data API可以幫助我們構建靈活高效的輸入流水線。本文檔介紹了 tf.data API的功能,以及在各種模型和加速器上構建高性能TensorFlow輸入流水線的最佳做法

2. Pipeline Structure輸入流水線結構

我們可以將典型的 TensorFlow 訓練輸入流水線視為 ETL 流程:

  1. Extract:從永久性存儲(可以是 HDD 或 SSD 等本地存儲或 GCS 或 HDFS 等遠程存儲)讀取數據。
  2. Transform:使用CPU核心解析數據並對其執行預處理操作,例如圖像解壓縮、數據增強轉換(例如隨機裁剪、翻轉和顏色失真)、重排和批處理。
  3. Load:將轉換后的數據加載到執行機器學習模型的加速器設備(例如,GPU 或 TPU)上。

這種模式可高效利用 CPU,同時預留加速器來完成對模型進行訓練的繁重工作。此外,將輸入流水線視為 ETL 流程可提供便於應用性能優化的結構。

使用 tf.estimator.Estimator API 時,前兩個階段(提取和轉換)是在 input_fn(傳遞給 tf.estimator.Estimator.train)中捕獲的。代碼可能如以下(簡單序列)實現所示:

def parse_fn(example):
  "Parse TFExample records and perform simple data augmentation."
  example_fmt = {
    "image": tf.FixedLengthFeature((), tf.string, ""),
    "label": tf.FixedLengthFeature((), tf.int64, -1)
  }
  parsed = tf.parse_single_example(example, example_fmt)
  image = tf.image.decode_image(parsed["image"])
  image = _augment_helper(image)  # augments image using slice, reshape, resize_bilinear
  return image, parsed["label"]

def input_fn():
  files = tf.data.Dataset.list_files("/path/to/dataset/train-*.tfrecord")
  dataset = files.interleave(tf.data.TFRecordDataset)
  dataset = dataset.shuffle(buffer_size=FLAGS.shuffle_buffer_size)
  dataset = dataset.map(map_func=parse_fn)
  dataset = dataset.batch(batch_size=FLAGS.batch_size)
  return dataset

2.1 最佳Pipeline步驟

在這里先給出最佳做法,如果同學們只想知道怎么做,直接參考這里就可以啦。

下面的內容是針對每一點優化的原理。

3. 優化性能

由於新型計算設備(例如 GPU 和 TPU)可以不斷提高神經網絡的訓練速度,因此,CPU 處理很容易成為瓶頸。tf.data API 為用戶提供構建塊來設計可高效利用 CPU 的輸入流水線,並優化 ETL 流程的每個步驟。

3.1 prefetch預取數據

要執行訓練步驟,您必須首先提取並轉換訓練數據,然后將其提供給在加速器上運行的模型。但是,在一個簡單的同步實現中,當 CPU 正在准備數據時,加速器處於空閑狀態。相反,當加速器正在訓練模型時,CPU 處於空閑狀態。因此,訓練步的用時是 CPU 預處理時間和加速器訓練時間的總和。

流水線將訓練步驟的預處理和模型執行過程重疊到一起。當加速器正在執行第 N 個訓練步時,CPU 正在准備第 N+1 步的數據。這樣做不僅可以最大限度地縮短訓練的單步用時(而不是總用時),而且可以縮短提取和轉換數據所需的時間。

如果不使用流水線,CPU 和 GPU/TPU 在大部分時間都處於空閑狀態:

image

使用流水線可顯著減少空閑時間:
image

tf.data API 通過 tf.data.Dataset.prefetch 轉換提供了一種軟件流水線機制,該機制可用於將生成數據的時間和使用數據的時間分離開。具體而言,該轉換使用后台線程和內部緩沖區,以便在請求元素之前從輸入數據集中預取這些元素。因此,為了實現上圖所示的流水線效果,您可以將 prefetch() 作為最終轉換添加到數據集流水線中(如果單步訓練使用 n 個元素,則添加 prefetch(n))。

要將此項更改應用於我們正在運行的示例,請將:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
return dataset

更改為:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
dataset = dataset.prefetch(buffer_size=FLAGS.prefetch_buffer_size)
return dataset

3.2 map並行處理數據轉換

准備批次數據時,可能需要預處理輸入元素。為此,tf.data API 提供了 tf.data.Dataset.map 轉換,以將用戶定義的函數(例如,正在運行的示例的 parse_fn)應用於輸入數據集的每個元素。由於輸入元素彼此獨立,因此可以跨多個 CPU 核心並行執行預處理。為實現這一點,map 轉換提供了 num_parallel_calls 參數來指定並行處理級別。例如,下圖說明了將 num_parallel_calls=2 設置為 map 轉換的效果:

image

並行后,由於數據預處理的時間縮短,整體的時間也減少了。如何為 num_parallel_calls 參數選擇最佳值取決於硬件、訓練數據的特征(例如其大小和形狀)、映射函數的成本以及同時在 CPU 上進行的其他處理;一個簡單的啟發法是設為可用 CPU 核心的數量。例如,如果執行以上示例的機器有 4 個核心,則設置 num_parallel_calls=4 會更高效。另一方面,將 num_parallel_calls 設置為遠大於可用 CPU 數量的值可能會導致調度效率低下,進而減慢速度。

要將此項更改應用於我們正在運行的示例,請將:

dataset = dataset.map(map_func=parse_fn)

更改為:

dataset = dataset.map(map_func=parse_fn, num_parallel_calls=FLAGS.num_parallel_calls)

此外,如果批次大小為數百或數千,那么並行處理批次創建過程還可能給流水線帶來更大的優勢。為此,tf.data API 提供了 tf.contrib.data.map_and_batch 轉換,它可以有效地將映射和批次轉換“混合”在一起。

要將此項更改應用於我們正在運行的示例,請將:

dataset = dataset.map(map_func=parse_fn, num_parallel_calls=FLAGS.num_parallel_calls)
dataset = dataset.batch(batch_size=FLAGS.batch_size)

更改為:

dataset = dataset.apply(tf.contrib.data.map_and_batch(
    map_func=parse_fn, batch_size=FLAGS.batch_size))

3.3 並行處理遠程數據提取

在實際設置中,輸入數據可能會遠程存儲(例如,GCS 或 HDFS),這是因為輸入數據不適合本地存儲,或因為訓練是分布式訓練,因此在每台機器上復制輸入數據沒有意義。非常適合在本地讀取數據的數據集流水線在遠程讀取數據時可能會遇到 I/O 瓶頸,這是因為本地存儲和遠程存儲之間存在以下差異:

  • 首字節時間:與本地存儲相比,從遠程存儲讀取文件的首字節所用時間可能要多出幾個數量級。
  • 讀取吞吐量:雖然遠程存儲通常可提供較大的聚合帶寬,但讀取單個文件可能只能利用此帶寬的一小部分。

此外,將原始字節讀入內存中后,可能還需要對數據進行反序列化或解密(例如,protobuf),這會帶來額外的開銷。無論數據是在本地還是遠程存儲,都存在這種開銷,但如果未有效預取數據,則在遠程存儲的情況下可能更糟。

為了降低各種數據提取開銷的影響,tf.data API 提供了 tf.contrib.data.parallel_interleave 轉換。使用此轉換可以並行執行其他數據集(例如數據文件讀取器)並交錯這些數據集的內容。可以通過 cycle_length 參數指定要重疊的數據集的數量。

下圖說明了為 parallel_interleave 轉換提供 cycle_length=2 的效果:

image
要將此項更改應用於我們正在運行的示例,請將:

dataset = files.interleave(tf.data.TFRecordDataset)

更改為:

dataset = files.apply(tf.contrib.data.parallel_interleave(
    tf.data.TFRecordDataset, cycle_length=FLAGS.num_parallel_readers))

由於負載或網絡事件,遠程存儲系統的吞吐量可能會隨時間而變化。鑒於這種差異,parallel_interleave 轉換可以選擇使用預取(如需了解詳情,請參閱 tf.contrib.data.parallel_interleave)。

默認情況下,parallel_interleave 轉換可提供元素的確定性排序以幫助實現可再現性。作為預取的替代方案(在某些情況下可能效率低下),parallel_interleave 轉換還提供了一個可提升性能但無法保證排序的選項。特別是,如果 sloppy 參數設為 true,則該轉換可在系統請求下一個元素時暫時跳過其元素不可用的文件,從而放棄該轉換的確定性排序。

4. 性能考慮因素

tf.data API 圍繞可組合轉換而設計,旨在為用戶提供靈活性。雖然這些轉換中有很多都是可以交替的,但某些轉換的順序會對性能產生影響。

4.1 map映射和batch批次

調用傳遞給 map 轉換的用戶定義函數具有與調度和執行用戶定義函數相關的開銷。通常,與函數執行的計算量相比,這種開銷很小。但是,如果 map 幾乎不起作用,那么這種開銷可能會占總成本的很大一部分。在這種情況下,建議向量化用戶定義的函數(即,讓該函數一次對一批輸入進行操作),並在 map 轉換之前先應用 batch 轉換
或者直接更改為如下代碼:

dataset = dataset.apply(tf.contrib.data.map_and_batch(
    map_func=parse_fn, batch_size=FLAGS.batch_size))

4.2 map映射和cache緩存

tf.data.Dataset.cache 轉換可以在內存或本地存儲中緩存數據集。如果傳遞給 map 轉換的用戶定義函數代價很高,則只要內存或本地存儲仍可以容納生成的數據集,就可以在映射轉換后應用緩存轉換。如果用戶定義的函數會增加存儲數據集所需的空間,並超出緩存容量,請考慮在訓練作業之前預處理數據以減少資源消耗量。

4.3 map映射和interleave交錯/prefetch預取/shuffle重排

許多轉換(包括map interleave、prefetch 和 shuffle)都維持一個內部元素緩沖區。如果傳遞給 map 轉換的用戶定義函數改變了元素的大小,那么映射轉換的順序和緩沖元素的轉換會影響內存使用量。通常,我們建議選擇可以減少內存占用的順序,除非為了提高性能而需要采用不同的順序(例如,為了混合映射和批次轉換)。

4.4 repeat重復和shuffle重排

tf.data.Dataset.repeat 轉換會將輸入數據重復有限(或無限)次;每次數據重復通常稱為一個周期。tf.data.Dataset.shuffle 轉換會隨機化數據集樣本的順序。

如果在 shuffle 轉換之前應用 repeat 轉換,則系統會對周期邊界進行模糊處理。也就是說,某些元素可以在其他元素出現之前重復出現。另一方面,如果在重復轉換之前應用 shuffle 轉換,那么在每個周期開始時性能可能會降低,因為需要初始化 shuffle 轉換的內部狀態。換言之,前者(repeat 在 shuffle 之前)可提供更好的性能,而后者(repeat 在 shuffle 之前)可提供更強的排序保證。

如果可能,建議您使用 tf.contrib.data.shuffle_and_repeat 混合轉換,這樣可以達到兩全其美的效果(良好的性能和強大的排序保證)。否則,我們建議在repeat重復之前進行shuffle重排


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM