《MapReduce: Simplified Data Processing on Large Clusters》論文研讀


MapReduce 論文研讀

說明:本文為論文 《MapReduce: Simplified Data Processing on Large Clusters》 的個人理解,難免有理解不到位之處,歡迎交流與指正 。

論文地址MapReduce Paper


1. MapReduce 編程模型

MapReduce 是 Google 提出的一種用於處理和生成大數據集的 編程模型 ,具象地可以理解成一個 框架

該框架含有兩個由用戶來實現的接口:mapreducemap 函數接收一個鍵值對,生成一個中間鍵值對集合,MapReduce 框架會將所有共用一個鍵的值組合在一起並傳遞給 reduce 函數,reduce 函數接收此中間鍵以及該鍵的值的集合,將這些值合並在一起,生成一組更小的值的集合 。

該編程模型中,數據形式變換可由以下模式表示:

map:	(k1, v1)	->	list(k2, v2)
reduce:	(k2, list(v2))	-> 	list(v3)

:論文中該模式第二行表示為 reduce: (k2, list(v2)) -> list(v2) ,個人認為由於通常情況下 reduce 會對 list<v2> 做一些處理(特殊情況下不做任何處理,即 reduce 為恆等函數),生成一些不同的值,所以用 list<v3> 進行表示可以區分處理前后的差異,更具一般化 。


2. 示例:文檔中單詞計數

論文中給出了 MapReduce 的經典使用示例,即 統計文檔中每個單詞出現次數word count 任務 ),通過此示例可以直觀了解到 MapReduce 的使用方法 。

由用戶實現的 mapreduce 函數的偽代碼為:

map(String key, String value):
	// key: document name
	// value: document contents
	for each word w in value:
		EmitIntermediate(w,"1");

reduce(String key, Iterator values):
	// key: a word
	// values: a list of counts
	int result = 0;
	for each v in values:
		result += ParseInt(v);
	Emit(AsString(result));

通過上述偽代碼可以看到:

  • 對於 map 函數,輸入一個鍵值對, key 為文件名,value 為文件內容,它對文件中每一個單詞都生成 中間鍵值對 <w, '1'> ,最后返回的內容為一個鍵值對的集合,表示為 list(<'cat', '1'>, <'dog', '1'>, ..., <'cat', '1'>, <'pig', '1'>)
  • 對於 reduce 函數,輸入一個鍵值對,key 為一個單詞 ,value 為該單詞對應的計數的列表,即 list('1', '1', '1', ..., '1') ,列表中 '1' 的個數即為文檔中該單詞出現的次數,最后將單詞出現的次數返回
  • list(<'cat', '1'>, <'dog', '1'>, ..., <'cat', '1'>, <'pig', '1'>) 轉化為 <'cat', list('1', '1', '1', ..., '1')> 的步驟是由 MapReduce 框架來執行的

上述過程可以圖示為:

論文附錄 A 有由 C++ 實現的針對文檔詞計數任務的 map 函數、reduce 函數 以及 調用兩接口的完整程序代碼,在此不做詳述 。


3. Google 的 MapReduce 實現

MapReduce 模型可以有多種不同的實現方式,論文主要介紹了一種在 Google 內部廣泛使用的計算環境下(通過以太網交換機連接,並由商用服務器所組成的大型集群)使用的 MapReduce 實現 。

3.1 執行流程

上圖為此 MapReduce 框架實現的示意圖,下文基於此圖對 MapReduce 的執行過程進行描述,描述的序號與圖中的序號相對應(這部分內容論文描述比較詳細,所以以翻譯為主,穿插個人理解以及補充后文中的優化細節):

  1. MapReduce 庫會先把文件切分成 M 個片段( 每個大小為 16MB~64MB ),存儲在 GFS 文件系統 ,接着,它會在集群中啟動多個 程序副本
  2. 這些程序副本中,一個為 master ,剩余為 workermasterworker 進行任務分配,共有 Mmap 任務以及 Rreduce 任務( M 同時為文件片段數 , R 由用戶指定),master 會給每個空閑的 worker 分配一個 map 任務或者一個 reduce 任務 。
  3. 被分配了 map 任務的 worker 會讀取相關的輸入數據片段,這些數據片段一般位於該 worker 所在的服務器上( master 調度時會優先使 map 任務執行在存儲有相關輸入數據的服務器上,通過這種 本地執行 的方式降低服務器間網絡通信,節約網絡帶寬 )。它會解析出輸入數據中的 鍵值對 ,並將它們傳入用戶定義的 Map 函數中,Map 函數所生成的 中間鍵值對 會被緩存在內存中 。( 要將 map 任務和用戶定義的 Map 函數區分開來,map 任務包含了一些前置處理以及 Map 函數的執行 ,reduce 任務和 Reduce 函數同理 )
  4. 每隔一段時間,被緩存的中間鍵值對會被寫入到本地硬盤,並通過分區函數(一般是哈希后取模)分到 R 個區域內 。這些被緩存的鍵值對在本地硬盤的位置會被傳回 mastermaster 負責將這些位置轉發給執行 reduce 任務的 worker
  5. 所有 map 任務執行結束后,master 才開始分發 reduce 任務 。當某個執行 reduce 任務的 workermaster 獲取到了這些位置信息,該 worker 就會通過 RPC 的方式從保存了對應緩存中間數據的 map workers 的本地硬盤中讀取數據 ( 輸入一個 reduce 任務中的中間數據會產生自所有 map 任務 )。當一個 reduce worker 讀完所有中間數據后,會 根據中間鍵進行排序,使得具有相同中間鍵的數據可以聚合在一起 。(需要排序是因為中間 key 的數量一般遠大於 R ,許多不同 key 會映射到同一個 reduce 任務中 )如果中間數據的數據量太大而無法放到內存中,需要使用外部排序 。
  6. reduce worker 會對排序后的中間數據進行遍歷,對於每個唯一的中間鍵,將該中間鍵和對應的中間值的集合傳入用戶提供的 Reduce 函數中,Reduce 函數生成的輸出會被追加到這個 reduce 任務分區的輸出文件中 ( 即一個 reduce 任務對應一個輸出文件,即 R 個輸出文件,存儲在 GFS 文件系統,需要的話可作為另一個 MapReduce 調用的輸入 )。
  7. 當所有的 mapreduce 任務完成后,master 會喚醒用戶程序 。此時,用戶程序會結束對 MapReduce 的調用 。

3.2 容錯

3.2.1 Woker 故障

master 會周期性地 ping 每個 worker ,若在一定時間內無法收到某個 worker 的響應,那么 master 將該 worker 標記為 fail

  • worker完成 的所有 map 任務都被重設為 idle 狀態,交由別的 worker 去執行這些 map 任務
  • worker正在執行map 任務或 reduce 任務重設為 idle 狀態,並等待重新調度

worker 上完成的 map 任務必須重新執行,因為 map 任務數據結果保存在 worker 的本地硬盤中,worker 無法訪問了,則輸出數據也無法訪問;該 worker 上完成的 reduce 任務不需要重新執行,因為輸出結果已存儲在全局文件系統中 。

3.2.2 Master 故障

目前的實現選擇中斷 MapReduce 計算,客戶端可檢查該 master 的狀態,並根據需要重新執行 MapReduce 操作 。

3.3 數據存儲位置

此模式是為了 節約網絡帶寬

將輸入數據( 由 GFS 系統管理 )存儲在集群中服務器的本地硬盤上,GFS 將每個文件分割為大小為 64MBBlock ,並且對每個 Block 保存多個副本(通常3個副本,分散在不同機器上)。master 調度 map 任務時會考慮輸入數據文件的位置信息,盡量在包含該相關輸入數據的拷貝的機器上執行 map 任務 。若任務失敗,master 嘗試在保存輸入數據副本的鄰近機器上執行 map 任務,以此來節約網絡帶寬 。

3.4 備用任務

此模式是為了緩解 straggler (掉隊者) 問題 ,即 :一台機器花費了異常多的時間去完成 最后幾個 mapreduce 任務,導致整個計算時間延長的問題 。可能是由於硬盤問題,可能是 CPU 、內存、硬盤和網絡帶寬的競爭而導致的 。

解決此問題的方法是:當一個 MapReduce 計算 接近完成 時,master 為正在執行中的任務執行 備用任務 ,當此任務完成時,無論是主任務還是備用任務完成的,都將此任務標記為完成 。這種方法雖然多使用了一些計算資源,但是有效降低了 MapReduce Job 的執行時間 。

3.5 Combiner 函數

某些情況下,每個 map 任務生成的中間 key 會有明顯重復,可使用 Combiner 函數map worker 上將數據進行部分合並,再傳往 reduce worker

Combiner 函數 和 Reduce 函數的實現代碼一樣,區別在於兩個函數輸出不同,Combiner 函數的輸出被寫入中間文件,Reduce 函數的輸出被寫入最終輸出文件 。

這種方法可以提升某些類型的 MapReduce 任務的執行速度( 如 word count 任務)。

3.6 臨時中間文件

對於有服務器故障而可能導致的 reduce 任務可能讀到部分寫入的中間文件 的問題 。可以使用 臨時中間文件 ,即 map 任務將運算結果寫入臨時中間文件,一旦該文件完全生成完畢,以原子的方式對該文件重命名 。


4. MapReduce 的優點

  • 適合PB級以上海量數據的離線處理

  • 隱藏了並行化、容錯、數據分發以及負載均衡等細節

  • 允許沒有分布式或並行系統經驗的程序員輕松開發分布式任務程序

  • 伸縮性好,使用更多的服務器可以獲得更多的吞吐量


5. MapReduce 的限制

  • 不擅長實時計算
  • 無法進行流式計算,因為 MapReduce 的輸入數據是靜態的
  • 無多階段管道,對於先后依賴的任務,MapReduce 必須把數據寫入硬盤,再由下一個 MapReduce 任務調用這些數據,造成了多余的磁盤 I/O

6. 相關問題總結

6.1 MapReduce 如何節約網絡帶寬

  1. 集群中所有服務器既執行 GFS ,也執行 MapReduceworker

  2. master 調度時會優先使 map 任務執行在存儲有相關輸入數據的服務器上

  3. reduce worker 直接通過 RPCmap worker 獲取中間數據,而不是通過 GFS ,因此中間數據只需要進行一次網絡傳輸

  4. R 遠小於中間 key 的數量,因此中間鍵值對會被划分到一個擁有很多 key 的文件中,傳輸更大的文件( 相對於一個文件擁有更少的 key )效率更高

6.2 MapReduce 如何獲得好的負載均衡

  1. 通過備用任務緩解 straggler 問題
  2. 使 task 數遠多於 worker 數,master 將空閑任務分給已經完成任務的 worker

7. 現狀

  • MapReduce 已被 Flume / FlumeJava 所取代
  • GFS 已被 ColossusBigTable 所取代


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM