初識分布式計算:從MapReduce到Yarn&Fuxi


    這些年,雲計算、大數據的發展如火如荼,從早期的以MapReduce為代表的基於文件系統的離線數據計算,到以Spark為代表的內存計算,以及以Storm為代表的實時計算,還有圖計算等等。只要數據規模到了一定的程度,都需要依賴分布式計算來實時或者離線做出決策。雖然本人並未從事相關工作,但是了解一下還是好的。

  MapReduce這個詞一度是分布式計算的代名詞,至少代表了離線計算這一大類大數據編程范式。當提到這個詞,可能是指google的論文,或者hadoop的mr實現,也或者是這種編程范式。在本文中,除非特別說明,都是指Google 2004年的論文《MapReduce: Simplified Data Processing on Large Clusters》,這篇論文非常通俗易懂,也沒有復雜的概念和算法,值得一讀。

  在論文中,MapReduce既是一種分布式編程模型(programming model);又是該模型的運行時環境,該環境是為這種編程模型所量身定做的。

MapReduce is a programming model and an associated implementation for processing and generating large data sets.  

MapReduce編程模型

  前面說MapReduce論文比較容易讀懂,原因就是MapReduce的思想很簡單。MapReduce分為兩個階段:Map, Reduce。 Map就是把問題划分成獨立的組成部分,每一個Map(計算過程)處理獨立的一部分數據,分而治之,逐個擊破; Reduce就是對Map的結果做一個聚合。

  程序員學習每一門語言都是從Hello World開始,分布式計算也有“Hello World”, 那就是WordCount。WordCount就是統計一段文本中每個單詞的出現次數,具體計算過程就是先把文本分成M份數據,通過M個並行的map function計算出每一份數據中每個單詞的次數,生成結果是一個<key, value>對,key是單詞,value是該單詞的數量;而reduce function將所有map function的結果按照key進行聚合(相加),結果就是整個文本中每個單詞的數目。

  我覺得下面這個圖更加形象(來源見水印):

  

  在論文中提到,MapReduce來自函數式編程(如Lisp)的兩個算子:map、reduce。函數式編程是與面向對象編程並列的一種編程范式,函數式編程中,函數是一等公民,且支持高階函數,即以函數作為另一個函數的參數。即使是面向對象的編程語言,也提供函數式編程機制,比如C++中的std::for_each,而對於筆者比較熟悉的Python,也有兩個同名、同含義的函數map reduce。下面是python代碼

>>> lst = [1, 2, 3, 5, 6]
>>> new_lst = map(lambda e: 2 * e, lst)
>>> new_lst
[2, 4, 6, 10, 12]
>>>
>>> reduced = reduce(lambda total, e : total + e, lst, 0)
>>> reduced
17 

  指的一提的是,函數式編程的無狀態特性,即函數的運行結果只依賴於輸入,且不會對外界的環境產生引用,只有輸入相同,輸出就是確定的,這個特性非常重要,因為在MapReduce編程模型中,會有同一份數據的重復計算,后面會提到。

  還有,從對map過程的介紹不難理解,MapReduce只適合線性可並行的計算任務,子任務之間不能有依賴關系,只有這樣的計算任務,才能進行拆分,然后通過map並行計算,最終通過reduce進行結果的疊加。主要滿足這個條件,都適合用mapreduce模型來計算,如論文中提到的:

  • word counting:統計文本中每個單詞的數量
  • distributed grep:分布式grep
  • Count of URL Access Frequency:統計每個url的訪問頻率
  • Reverse Web-Link Graph: 引用關系圖
  • Inverted Index: 倒排索引

  不管是WordCount問題,還是上面“做菜”的例子,大家一看都很簡單,完全可以串行執行。但是當數據量到了TB、PB級別,用單台計算機的話得計算到猴年馬月,分布式就是要解決單台計算機無法完成的計算任務,每台計算機算一點(map過程),然后將結果匯總起來(reduce過程),這就是分布式計算。

MapReduce運行時環境

  前面提到,盡管MapReduce編程模型簡單直觀,但是由於數據量巨大的問題,必須要使用大量機器進行分布式計算,而分布式本事又是一個復雜且困難的問題。因此,運行時環境(框架)的作用就在於將程序員從分布式的繁枝縟節中解放出來,程序員只需關注本質的問題:數據的計算問題,而不用關心數據是在哪台機器上計算的、怎么計算的。而且由於MapReduce編程模型的普適性,只要將運行環境與計算公式隔離開,就可以復用運行環境,大大提高生產力。

  運行時環境是提供給需要進行大量數據計算的程序員使用的,后文統稱使用運行時環境的程序員為運行時環境的用戶

  以Google實現的MapReduce·運行環境為例,運行時環境負責輸出數據的拆分、將計算任務在一組節點(計算機)上調度執行、處理節點的故障、以及節點之間的RPC通信。用戶只需要提供兩個最基本的算子,map function,reduce function即可。

The run-time system takes care of the details of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and managing the required inter-machine communication.This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.

  下面是mapreduce框架

  

  框架中有兩類節點(也可以說是進程),MasterWorker,當用戶提交一個計算作業(Job)的時候,會啟動Job唯一對應的Master進程,Master進程負責整個Job的調度,包括分配worker的角色(map或者reduce)、worker計算的數據,以及向用戶返回結果等等。而Worker負責的具體計算稱之為task,在MapReduce框架下,worker按照計算的階段又分為map worker和reduce worker,worker在master獲取計算任務,然后在文件系統讀取數據進行運算,並將結果寫入到臨時文件或者持久化文件系統。

  一個Job的流程是這樣的

  1. MapReduce將待運算的數據分為M份,每一份的大小為16M或者64M(這個跟默認使用的分布式文件系統GFS有關),每一份數據稱之為一個split
  2. 啟動M個map worker,讀取相應的split,然后調用用戶的Map function,對數據進行運算
  3. map worker周期性將計算結果(稱之為中間結果)寫入到R份本地文件中的其中一份,R是reduce worker的數量,具體寫入哪一個臨時文件 規則由Partitioning function指定
  4. 當一個map worker計算任務完成的時候,將R份中間結果的位置通知master,master通知對應的reduce worker
  5. reduce worker根據中間結果的位置,通過rpc從map worker上獲取與自己對應的中間結果,進行計算,並將計算結果寫入到持久化分布式文件系統,
  6. 當所有map reduce worker的計算任務結束之后,通知用戶計算結果

  當然,一個mapreduce的結果並不一定直接給用戶,很有可能是一個鏈式(chain)計算,即將一個mapreduce的輸出當做另一個mapreduce的輸入

MapReduce優缺點

  google MapReduce以及hadoop的開源實現mapreduce(簡稱hadoop mr1)都是十余年前的產物,雖然已經不能滿足現在的很多運算需求,但是還是有很多很好的設計值得學習借鑒。

  data locality,即計算離需要的數據存儲越近越好,以盡量避免網絡傳輸

  fault tolerance,分布式系統中,節點故障是常態,運行環境需要對用戶透明地監控、處理故障。由於mapreduce編程模型的線性無狀態特性,對於某一個worker的故障,只需將計算任務給其他worker負責就行

  backup task,按照木桶定律,即一只水桶能裝多少水取決於它最短的那塊木板,在mapreduce中,運行最為緩慢的worker會成為整個Job的短板。運行環境需要監控到異常緩慢的worker,主動將其上的task重新調度到其他worker上,以便在合理的時間結束整個Job,提高系統的吞吐。

  partitioning function & combiner function,這是用戶可以提供的另外兩個算子,實時上也是非常有用的。map worker的中間結果,通過partitioning function分發到R(R為reducer的數目)位本地文件,默認為“hash(key) mod R”。而Combiner function是對每一個map worker的結果先進行一次合並(partial merge),然后再寫入本地文件,以減少數據傳輸,較少reduce worker的計算任務。

  local execution:本地執行,會提供一個最小化的本地運行環境,以便用戶調試、分析自己的代碼。這個功能筆者神深有感觸,多年前實習的時候使用MapReduce進行數據分析,由於需要排隊且自己的優先級比較低,所以一個Job要等個一天才有結果,如果因為一個拼寫錯誤就要重新跑的話簡直要崩潰。這個時候就可以先在本地環境用少量數據進行調試,驗證。

  

  在google 論文中,也有不足或者沒有考慮到的點。

  第一:master是單點,故障恢復依賴於周期性的checkpoint,不保證可靠性,因此發生故障的時候會通知用戶,用戶自行決定是否重新計算。

  第二:沒有提到作業(Job)的調度策略,運行時環境肯定是有大量的Job並發的,因此多樣且高效的調度策略是非常重要的,比如按優先級、按群組

  第三:並沒有提到資源(CPU、內存、網絡)的調度,或者說並不區分作業調度與資源調度。

  第四:沒有提到資源隔離與安全性,大量Job並發的時候,如何保證單個Job不占用過多的資源,如何保證用戶的程序對系統而言是安全的,在論文中並沒有提及

  第五:計算數據來源於文件系統,效率不是很高,不過本來就是用於離線任務,這個也不是什么大問題

yarn & fuxi

  上一章節的最后提到了mapreduce的一些缺陷,包括master單點,資源調度與任務調度職責不明確,只使用與離線數據處理等。隨着大數據分析的蓬勃發展,就需要更加多樣性的分布式編程范式,比如實時數據處理、內存計算、圖計算等等。編程范式的多樣化對運行時環境提出了更大的挑戰,即運行時環境需要更通用,以支持不同的編程模型,而不是像mapreduce框架那樣只支持mapreduce這種編程范式。不同的編程范式,或者說不同的計算任務,對資源(如CPU、內存)的需求是不同的,因此需要優秀的調度策略,在滿足應用的特殊需求的情況下,最大化利用資源,同時也需要做好任務之間的隔離,避免相互影響。

  在這種需求背景下,就產生了hadoop 2.0 ,主要就是分布式資源調度器Yarn,國內的話比較出名的就是阿里的fuxi系統。二者在架構上非常類似,下面做簡單介紹。

  需要注意的是,yarn只是hadoop2.0的資源調度器,只負責資源的調度。由於資源調度與任務相互隔離,因此yarn支持更多的分布式計算模型,包括MapReduce,Spark,Storm等。

  Yarn的架構如下:

  

  在Yarn中,有以下組件

  ResourceManager:資源管理器,接收用戶的請求,負載應用(application)的調度管理,啟動應用對應的ApplicationMaseter,並為每一個應用分配所需的資源

  NodeManager:框架agent,在每一個計算機節點上都有一個,用於本機上的Container,監控機器的資源使用情況,並向ResourceManager匯報

  ApplicationMaseter(圖中所有為App Mstr),每一個應用都有自己唯一的ApplicationMaseter,用於管理應用的生命周期,向ResourceMananger申請資源,監控任務對應的container

  Containner:具體任務task的計算單元,是一組資源的抽象,可用於以后實現資源的隔離

  其中,ResourceManager包含兩個重要的組件,scheduler和ApplicationManager。scheduler負責為各種應用分配資源,支持各種調度算法,如 CapacityScheduler、 FairScheduler。ApplicationManager負責接收用於的請求,啟動應用對應的ApplicationMaseter。

 

  阿里fuxi的架構與Yarn非常類似,如圖所示

  

  可以看到:

  fuxi master對應Resourcemanager

  tubo對應NodeManager 

  AppMaster對應ApplitionMast

  AppWorker對應Container

 

  關於阿里的fuxi分布式調度,在阿里雲大學和課堂在線都有視頻課程,包含了許多細節,如詳細的資源調度流程,各種調度策略,容錯機制,應用的隔離等,值得看一看。

references

MapReduce: Simplified Data Processing on Large Clusters

Apache Hadoop YARN

分布式系統開發——調度技術


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM