[翻譯]MapReduce: Simplified Data Processing on Large Clusters


MapReduce: Simplified Data Processing on Large Clusters

MapReduce:面向大型集群的簡化數據處理

摘要

MapReduce既是一種編程模型,也是一種與之關聯的、用於處理和產生大數據集的實現。用戶要特化一個map程序去處理key/value對,並產生中間key/value對的集合,以及一個reduce程序去合並有着相同key的所有中間key/value對。本文指出,許多實際的任務都可以用這種模型來表示。

用這種函數式風格寫出的程序自動就擁有了在一個大的PC機集群上並行執行的能力。運行時系統會負責細節:切分輸入數據,在一組機器上調度執行程序,處理機器錯誤,以及管理所需的機器間通信。這允許不具備任何並行和分布式系統經驗的程序員也能輕松地利用一個大型分布式系統的資源。

我們的MapReduce實現運行在一個大型PC機集群上,且具有很好的擴展性:一個典型的MapReduce計算要在數千台機器上處理若干TB的數據。程序員可以很輕松的使用這一系統:目前已經實現的MapReduce程序數以百計,每天有上千個MapReduce作業運行在Google的集群上。

1 介紹

在過去的5年中,作者以及許多其他Google員工實現了數百個特定用途的計算過程,其中包括了處理大量的原始數據(抓取文檔、網絡請求日志等等),計算許多種類的衍生數據(倒排索引、網絡文檔圖結構的多種表示、單台主機抓取頁面數量的概要、指定日期頻次最高的請求集合等等)。大多數這樣的計算過程在概念上都很直接。但輸入數據量通常都很大,因此計算過程需要分布到數百或數千台機器上進行,才能保證過程在一個合理時間內結束。而為了處理計算並行化、數據分發和錯誤處理等問題而引入大量復雜的代碼則令原本簡單的計算過程變的晦澀難懂。

作為對這種復雜性的回應,我們設計了一種新的抽象,允許我們表達出原本簡單的計算過程,且將涉及並行、容錯性、數據分發和負載均衡的凌亂細節隱藏在一個函數庫中。我們的抽象受到了Lisp等函數式語言中的map和reduce原語的啟發。我們意識到我們大多數的計算都包含了在每個輸入的邏輯“記錄”上應用map操作,從而計算出一組中間key/value對的集合,然后再向共享同一個key的所有中間value應用reduce操作,從而適當地合並衍生數據。我們對用戶定義的map和reduce操作的使用允許我們輕松地將大型計算並行化,以及將再執行作為容錯性的主要機制。

本項工作的主要貢獻是一個簡單但功能強大的接口,允許自動實現並行化和大范圍的分布計算,和一個與此接口結合的實現,能在普通PC機集群上達到很高的性能。

第2部分描述了基本的編程模型並給出了幾個例子。第3部分描述了針對我們基於集群的計算環境而裁剪的MapReduce接口的一個實現。第4部分描述了我們覺得非常有用的一些編程模型的技巧。第5部分針對多種不同的任務,對我們的實現進行了性能測量。第6部分探索了在Google中MapReduce的應用,包括了我們在將其作為生產索引系統的重寫基礎的經驗。第7部分討論了相關的和未來要做的工作。

2 編程模型

計算過程就是輸入一組key/value對,再生成輸出一組key/value對。MapReduce庫的使用者用兩個函數來表示這個過程:map和reduce。

map由使用者編寫,使用一個輸入key/value對,生成一組中間key/value對。MapReduce庫將有着相同中間key I的中間value都組合在一起,再傳給reduce函數。

reduce也由使用者編寫,它接受一個中間key I和一組與I對應的value。它將這些value合並為一個可能更小的value集合。通常每個reduce調用只產生0或1個輸出value。中間value是通過一個迭代器提供給reduce函數的。這允許我們操作那些因為大到找不到連續存放的內存而使用鏈表的value集合。

2.1  示例

考慮一個問題:統計一個很大的文檔集合中每個單詞出現的次數。使用者能寫出與下面的偽代碼相似的代碼:

map(String key,String value):
    // key: 文檔名
    // value: 文檔內容
    for each word w in value:
        EmitIntermediate(w,"1");
  
reduce(Stringkey, Iterator values):
    // key: 一個單詞
    // value: 計數值列表
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

map函數將每個單詞與出現次數一同輸出(本例中簡單的輸出“1”)。reduce函數將針對某個特定詞輸出的次數都合並相加。

另外,使用者要寫代碼填充一個符合MapReduce規格的對象,內容包括輸入和輸出文件的名字,以及可選的調節參數。之后使用者調用MapReduce函數,將指定的對象傳進去。用戶代碼會與MapReduce庫(C++實現)鏈接到一起。附錄A包括了這個例子的全部程序文本。

2.2  類型

盡管前面的偽代碼寫成了用字符串進行輸入輸出,但從概念上講用戶提供的map和reduce函數是關聯着類型的:

map

(k1, v1)

→ list(k2, v2)

reduce

(k2, list(v2))

→ list(v2)

也就是說,輸入的key和value與輸出的key和value的域不同。進一步說,中間的key和value與輸出的key和value的域相同。

我們的C++實現使用字符串與用戶定義的函數交互,而將字符串與相應類型的轉換留給用戶代碼完成。

2.3  更多例子

這里例舉了一些有趣的程序,它們都可以很輕松的用MapReduce模型表達。

分布式Grep:map函數在匹配到給定的pattern時輸出一行。reduce函數只是將給定的中間數據復制到輸出上。

URL訪問頻次統計:map函數處理網頁請求的日志,對每個URL輸出〈URL, 1〉。reduce函數將相同URL的所有值相加並輸出〈URL, 總次數〉對。

倒轉Web鏈接圖:map函數在source頁面中針對每個指向target的鏈接都輸出一個〈target, source〉對。reduce函數將與某個給定的target相關聯的所有source鏈接合並為一個列表,並輸出〈target, list(source)〉對。

每個主機的關鍵詞向量:關鍵詞向量是對出現在一個文檔或一組文檔中的最重要的單詞的概要,其形式為〈單詞, 頻率〉對。map函數針對每個輸入文檔(其主機名可從文檔URL中提取到)輸出一個〈主機名, 關鍵詞向量〉對。給定主機的所有文檔的關鍵詞向量都被傳遞給reduce函數。reduce函數將這些關鍵詞向量相加,去掉其中頻率最低的關鍵詞,然后輸出最終的〈主機名, 關鍵詞向量〉對。

倒排索引:map函數解析每個文檔,並輸出一系列〈單詞, 文檔ID〉對。reduce函數接受給定單詞的所有中間對,將它們按文檔ID排序,再輸出〈單詞, list(文檔ID)〉對。所有輸出對的集合組成了一個簡單的倒排索引。用戶可以很輕松的擴展這個過程來跟蹤單詞的位置。

分布式排序:map函數從每條記錄中提取出key,並輸出〈key, 記錄〉對。reduce函數不改變這些中間對,直接輸出。這個過程依賴於4.1節介紹的划分機制和4.2節介紹的排序性質。

3 實現

許多不同的MapReduce的實現都是可行的。選擇哪一個要取決於環境。例如,一種實現可能適合於小型的共享內存機器,一種實現可能適合於大型的NUMA多處理器機器,而另一種則適合於更大型的聯網機器集。

本部分描述的實現主要面向Google內部廣泛使用的計算環境:大型的商用PC機集群,互相之間用交換式以太網連接。我們的環境是:

  1. 主要使用的機器為雙核X86處理器,運行Linux系統,每台機器的內存從2GB到4GB不等。

  2. 使用的都是商用網絡硬件設備——在機器層面上通常從100Mbps到1Gbps不等,但平均起來要比總帶寬的一半少很多。

  3. 集群中擁有數百或數千台機器,因此機器錯誤經常出現。

  4. 每台機器都使用廉價的IDE硬盤來提供存儲功能。我們使用一種內部開發的分布式文件系統來管理這些磁盤上的數據。這個文件系統通過復制的方法在不可靠的硬件之上提供了實用性與可靠性。

  5. 用戶向一個調度系統提交作業。每個作業包括了一個任務集,會由調度器調度到集群內可用的一組機器上。

3.1  執行過程概述

通過自動將輸入數據切分為M塊,map調用分布在多台機器上進行。輸入划分可以在不同的機器上並行執行。reduce調用是通過一個划分函數(例如hash(key) mod R)將中間key空間划分為R塊來分布運行。划分的塊數R和划分函數都由用戶指定。

QQ截圖20131116105114

圖1展示了我們的實現中MapReduce操作的整體流程。當用戶程序調用MapReduce函數時,會發生下面一系列動作(圖1中的標號與下面列表順序相同):

  1. 用戶程序中的MapReduce庫首先將輸入文件切分為M塊,每塊的大小從16MB到64MB(用戶可通過一個可選參數控制此大小)。然后MapReduce庫會在一個集群的若干台機器上啟動程序的多個副本。

  2. 程序的各個副本中有一個是特殊的——主節點,其它的則是工作節點。主節點將M個map任務和R個reduce任務分配給空閑的工作節點,每個節點一項任務。

  3. 被分配map任務的工作節點讀取對應的輸入區塊內容。它從輸入數據中解析出key/value對,然后將每個對傳遞給用戶定義的map函數。由map函數產生的中間key/value對都緩存在內存中。

  4. 緩存的數據對會被周期性的由划分函數分成R塊,並寫入本地磁盤中。這些緩存對在本地磁盤中的位置會被傳回給主節點,主節點負責將這些位置再傳給reduce工作節點。

  5. 當一個reduce工作節點得到了主節點的這些位置通知后,它使用RPC調用去讀map工作節點的本地磁盤中的緩存數據。當reduce工作節點讀取完了所有的中間數據,它會將這些數據按中間key排序,這樣相同key的數據就被排列在一起了。同一個reduce任務經常會分到有着不同key的數據,因此這個排序很有必要。如果中間數據數量過多,不能全部載入內存,則會使用外部排序。

  6. reduce工作節點遍歷排序好的中間數據,並將遇到的每個中間key和與它關聯的一組中間value傳遞給用戶的reduce函數。reduce函數的輸出會寫到由reduce划分過程划分出來的最終輸出文件的末尾。

  7. 當所有的map和reduce任務都完成后,主節點喚醒用戶程序。此時,用戶程序中的MapReduce調用返回到用戶代碼中。

成功完成后,MapReduce執行的輸出都在R個輸出文件中(每個reduce任務產生一個,文件名由用戶指定)。通常用戶不需要合並這R個輸出文件——他們經常會把這些文件當作另一個MapReduce調用的輸入,或是用於另一個可以處理分成多個文件輸入的分布式應用。

3.2  主節點數據結構

主節點維持多種數據結構。它會存儲每個map和reduce任務的狀態(空閑、處理中、完成),和每台工作機器的ID(對應非空閑的任務)。

主節點是將map任務產生的中間文件的位置傳遞給reduce任務的通道。因此,主節點要存儲每個已完成的map任務產生的R個中間文件的位置和大小。位置和大小信息的更新情況會在map任務完成時接收到。這些信息會被逐步發送到正在處理中的reduce任務節點處。

3.3  容錯性

既然MapReduce庫是為了幫助使用成百上千台機器處理數量非常大的數據的,它就必須能夠優雅地承受機器錯誤。

工作節點錯誤

主節點周期性的ping每個工作節點。如果工作節點在一定時間內沒有回應,主節點就將它標記為已失敗。這個工作節點完成的任何map任務都被重置為空閑狀態,並可被調度到其它工作節點上。同樣地,失敗的工作節點上正在處理的任何map或reduce任務也被重置為空閑狀態,允許被調度。

失敗節點上已完成的map任務需要重執行的原因是它們的輸出存儲在失敗機器的本地磁盤上,因此無法訪問到了。已完成的reduce任務不需要重執行,因為它們的輸出存儲在了一個全球文件系統上。

當一個map任務先被A節點執行過,隨后又被B節點重執行(A節點已失敗),所有執行reduce任務的工作節點都能收到重執行的通知。任何沒有讀取完A節點數據的reduce任務都會從B節點讀取數據。

MapReduce可以彈性應對大范圍的工作節點失敗。例如,在一次MapReduce操作期間,運行系統上的網絡維護導致了一組約80台機器在同一時間無法訪問,持續了數分鍾。MapReduce主節點只是簡單的重執行了已由無法訪問的機器完成的任務,並繼續向前執行,最終完成了這次MapReduce操作。

主節點錯誤

一種簡單的方法是令主節點定期將上面描述的數據結構保存為恢復點。如果主節點任務失敗,就可以從上一個恢復點狀態啟動一個新的程序副本。但是給定的條件是只有一個主節點,它也不太可能失敗;因此我們當前的實現會在主節點失敗時中止MapReduce計算 。客戶可以檢查到這一情況,並在他們需要時重啟MapReduce操作。

出現故障時的語義

當用戶提供的map和reduce操作對於它們輸入的值都是確定性的,我們的分布式實現產生的輸出值就如同將整個程序分成一個不間斷的串行執行過程一樣。

為了實現這個性質,我們依賴於map和reduce任務輸出結果的提交是原子的。每個處理中的任務都會將它的輸出寫入私有的臨時文件中。一個reduce任務產生一個這樣的文件,而一個map任務則產生R個這樣的文件(每個reduce任務一個)。當map任務完成時,工作節點發送給主節點的消息中帶有R個臨時文件的名字。如果主節點收到了一個來自已完成節點的完成消息,它就會忽略這個消息。否則,主節點會將R個文件的名字記錄在相應的數據結構中。

當reduce任務完成時,工作節點會執行原子性的更名操作,將臨時輸出文件更名為最終輸出文件。如果相同的reduce任務在多個機器上執行,就會有多個更名調用應用在相同的最終輸出文件上。我們依賴於由底層文件系統提供的原子更名操作,才能保證最終的文件系統中只包含由其中一個reduce執行產生的數據。

我們的絕大多數map和reduce操作都是確定性的,這種情況下我們的語義和一個串行執行過程是等同的,這也使程序員很容易推出他們程序的行為。當map和reduce操作有不確定性時,我們提供較弱但仍然合理的語義。當存在不確定的操作時,某個reduce任務R1的輸出等價於一個不確定程序的串行執行輸出。但某個reduce任務R2的輸出可能符合這個不確定程序的另一個串行執行輸出。

考慮map任務M和reduce任務R1、R2。令e(Ri)為Ri的已提交的執行結果(只執行一次)。此時弱語義生效,因為e(R1)可能讀取了M的一次輸出,而e(R2)則可能讀取了M的另一次輸出。

 

3.4  局部性

在我們的計算環境中,網絡帶寬是一種比較稀缺的資源。我們利用下面的事實來節省帶寬:輸入數據(由GFS管理)就存儲在組成集群的機器的本地磁盤上。GFS將每個文件分成64MB大小的區塊,每塊復制若干份(通常為3份)存儲到不同的機器上。MapReduce主節點會把輸入文件的位置信息考慮進去,並嘗試將map任務分配到保存有相應輸入數據的機器上。如果失敗的話,它會試圖將map任務調度到臨近這些數據的機器上(例如與保存輸入數據的機器處於同一網關的工作節點)。當在一個集群的相當一部分節點上運行MapReduce操作時,大多數輸入數據都是本地讀取,並不消耗網絡帶寬。

3.5  任務粒度

如上所述,我們將map階段分成M份,將reduce階段分成R份。理想情況下,M和R應該比工作節點機器的數量大很多。每個工作節點處理很多不同的任務,可以增強動態負責均衡能力,也能加速有工作節點失敗時的恢復情況:失敗節點已經完成的map任務有很多的時候也能傳遞給其它所有工作節點來完成。

在我們的實現中M和R的數量有一個實際的上限:如上所述,主節點必須做O(M+R)的調度決定以及在內存中保持O(M*R)個狀態。(但是內存使用的常數項很小:O(M*R)個狀態中每個map/reduce任務對只需要差不多1字節數據。)

進一步分析,R通常由用戶指定,因為每個reduce任務都會產生一個獨立的輸出文件。在實踐中我們傾向於這樣選擇M,即可以將每個單獨的任務分成16-64MB大的輸入數據(此時上面所說的局部性優化效果最好),同時我們令R為待使用的工作節點數量較小的整數倍。我們經常使用M=200000,R=5000,使用2000台機器來運行MapReduce計算。

3.6  備用任務

導致MapReduce操作用時延長的一個常見原因是出現了“落后者”:某台機器在完成最后的一個map或reduce任務時花費了反常的漫長時間。很多原因都會導致落后者的產生。例如,一台磁盤損壞的機器可能會遭遇頻繁的可校正錯誤,導致它的讀取性能從30MB/s降至1MB/s。集群調度系統可能已經調度了其它任務到這台機器,導致它在執行MapReduce代碼時因為CPU、內存、本地磁盤或網絡帶寬的競爭而更加緩慢。最近我們遇到的一個問題是機器的初始化代碼有一個bug,導致處理器緩存被禁用:受影響的機器上的計算速度下降了超過100倍。

我們有一個通用的機制來減輕落后者問題。當MapReduce操作接近完成時,主節點會將仍在處理中的剩余任務調度給其它機器作備用執行。原本的執行和備用執行中的任一個完成時都會將對應任務標記為已完成。我們已經調整過這個機制,使它因這個機制而增加的計算資源消耗通常只有一點點。我們已經觀察到這一機制有效地減少了大型MapReduce操作花費的時間。例如,5.3節中的排序程序在禁用這一機制時要多花費44%的時間。

4 技巧

盡管僅僅用map和reduce函數提供的基本功能就足夠解決大多數需求了,我們還是發現了一些很有用的擴展。這些擴展將在本節進行描述。

4.1  划分函數

MapReduce的用戶指定他們想要的reduce任務/輸出文件的數量。通過划分函數可以將數據按中間key划分給各個reduce任務。我們默認提供了散列函數當作默認的划分函數(例如,hash(key) mod R)。通常這就能得出很平衡的划分結果了。但在有些情況下,用key的其它信息來划分數據也很有幫助。例如有時輸出的key都是URL,而我們想讓所有來自同一主機的項最后都在同一個輸出文件中。為了支持類似這樣的情況,MapReduce的用戶可以提供一個特殊的划分函數。例如,使用“hash(主機名(urlkey)) mod R”來解決上面的問題。

4.2  順序保證

我們保證在給定的划分中,中間key/value對是按增序排列的。這個順序保證使每個划分產生一個有序的輸出文件變得很容易,當輸出文件的格式需要支持高效的按key隨機訪問,或用戶需要輸出數據有序時,這一性質會非常有用。

4.3  合並函數

某些情況中,不同的map任務產生的中間key重復率非常高,而且用戶指定的reduce函數可進行交換組合。一個典型的例子就是2.1節中的單詞統計。單詞頻率符合齊夫分布(百度百科),因此每個map任務都會產生非常多的<the, 1>這樣的記錄。所有這些記錄都會通過網絡被發送給一個reduce任務,然后再通過reduce函數將它們相加,產生結果。我們允許用戶指定一個可選的合並函數,在數據被發送之前進行局部合並。

合並函數由每個執行map任務的機器調用。通常合並函數與reduce函數的實現代碼是相同的。它們唯一的區別就是MapReduce庫處理函數輸出的方式。reduce函數的輸出會寫到最終的輸出文件中,而合並函數的輸出會寫到中間文件中,並在隨后發送給一個reduce任務。

4.4  輸入和輸出類型

MapReduce庫支持多種不同格式輸入數據的讀取。例如,“text”模式將每行輸入當作一個key/value對:key是文件中的偏移,而value則是該行的內容。另一種支持的常見格式將key/value對按key排序后連續存儲在一起。每種輸入類型的實現都知道如何將它本身分成有意義的區間從而能分成獨立的map任務進行處理(例如text模式的區間划分保證了划分點只出現在行邊界處)。用戶也可以通過實現一個簡單的reader接口來提供對新的輸入類型的支持,盡管大多數用戶只是使用為數不多的幾種預定義輸入類型中的一種。

reader的實現不一定要提供從文件中讀數據的功能。例如,可以很容易的定義一個從數據庫讀記錄的reader,或是從映射在內存中的某種數據結構中。

類似的,我們也支持一組輸出類型來產生不同格式的數據,而用戶提供代碼去支持新的輸出類型也不難。

4.5  邊界效應

有些情況下,MapReduce的用戶發現從他們的map或reduce操作中產生一些額外的輔助文件很有幫助。我們依賴於應用作者來確保這樣的邊界效應是原子且冪等的。通常應用會寫一個臨時文件,並在文件生成完畢時將其原子的更名。

我們不提供在單一任務產生的多個輸出文件中原子的兩段提交。因此,如果一個任務產生多個輸出文件,且要求有跨文件的一致性,它必須是確定性的。這個限制在實踐中還沒有引起過問題。

4.6  略過壞記錄

有時用戶代碼中的bug會導致map或reduce函數一遇到特定的記錄就崩潰。這些bug會導致MapReduce操作無法完成。通常的應用措施是修復這些bug,但有時難以實現:也許bug存在於得不到源代碼的第三方庫中。同樣地,有時忽略一些記錄是可以接受的,例如在一個大數據集上做統計分析時。我們提供了一個可選的執行模式,在MapReduce庫檢測到確定會導致崩潰的記錄時路過它們從而繼續進度。

每個工作進程都要安裝一個信號處理程序來捕捉違規操作和總線錯誤。在調用用戶的map或reduce操作前,MapReduce庫將用於驗證的序列號保存在全局變量中。如果用戶代碼產生了信號,信號處理程序就將一個包含這個序列號的“最后一步”UDP包發送給MapReduce主節點。當主節點在某個記錄上發現了超過一個錯誤時,就表明下一次重執行相應的map或reduce任務時要跳過這個記錄。

4.7  本地執行

map或reduce函數的調試問題常常令人難以捉摸,因為實際的計算過程都發生在分布式系統上,經常包含數千台機器,而工作分配決策都由主節點動態產生。為了幫助調試、性能分析、小范圍測試,我們開發了MapReduce庫的一個替代實現,可以將MapReduce操作的全部工作在一台本地機器上順序執行。用戶擁有控制權,因此計算可以被限制在特定的map任務中。用戶調用他們的程序時加上一個特殊標志,就可以方便的使用任何有用的調試或測試工具。

4.8  狀態信息

主節點內置了一個HTTP服務器,可以將當前狀態輸出為一組網頁供用戶使用。狀態網頁能顯示計算的進度,例如有多少任務被完成,多少正在處理,輸入數據大小,中間數據大小,輸出數據大小,處理速度,等等。這些網頁還包含指向每個任務的stdout和stderr輸出文件的鏈接。用戶可以用這些數據來預測計算要花費多長時間,以及是否應該增加計算使用的資源。這些網頁也能用於在計算速率比預期慢很多時發現這一情況。

另外,頂層的狀態網頁還能顯示哪些工作節點失敗了,它們失敗時正在處理哪些map和reduce任務。當要在用戶代碼中確定bug時這些時間非常有用。

4.9  計數器

MapReduce庫提供了計數器機制,可以統計多種事件的發生次數。例如,用戶代碼可能想統計已處理的單詞總數,或被索引的德語文獻的數量等。

為了使用這一機制,用戶代碼需要創建一個有名的計數器對象,並在map和reduce函數的適當位置增加它的值。例如:

 

Counter *uppercase = GetCounter("uppercase");
   
map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

工作節點上的計數器值會定期發送給主節點(附在ping的回應里)。當MapReduce操作完成時,主節點會將運行成功的map和reduce任務發來的計數器值合並后返回給用戶代碼。當前的計數器值也會顯示在主節點的狀態網頁上,其它人可以看到實時的計算進度。在合並計數器值時,主節點會忽略同一個map或reduce任務的重復的結果,從而避免多次疊加。(重復執行可能發生在我們的備用任務和失敗節點重執行中。)

有些計數值是由MapReduce庫自動維護的,例如輸入key/value對已處理的數量和輸出key/value對產生的數量等。

用戶們觀察到這一機制在對MapReduce操作的智能檢查上很有幫助。例如,在一些MapReduce操作中,用戶代碼可能想要確認產生的輸出對的數量恰好與輸入對的數量相等,或是已處理的德語文獻占處理文獻總數的比例是在接受范圍內的。

5 性能

本節中我們會在一個大型集群上測量MapReduce在兩個計算上的性能。一個計算是在差不多1TB的數據中查找指定的模式。另一個計算則是排序1TB的數據。

這兩個程序能代表大量真實的MapReduce程序——一類程序是將數據從一種表示形式轉換為另一種,另一類程序則是從很大數量的數據集中提取出一小部分感興趣的數據。

5.1  集群配置

所有的程序都運行在由1800台機器組成的集群上。每台機器配有:2GHz的Intel Xeon處理器且支持超線程,4GB內存,2塊160GB的IDE硬盤,和1GB以太網連接。所有機器都安排在2層樹結構的網關內,根節點的可用帶寬加起來有100-200Gbps。所有的機器都處於相同的托管設施下,因此任意兩台機器間的往返通信時間都少於1ms。

除了4GB的內存,還有1-1.5GB的內存保留給了集群上運行的其它任務。程序運行的時間是一個周末的下午,此時大多數CPU、磁盤和網絡資源都空閑中。

5.2  Grep

grep程序要掃描1010個長度為100字節的記錄來尋找一個相當罕見的三字符模式(這個模式出現於92337個記錄中)。輸入被切分成64MB大小的部分(M = 15000),整個輸出為一個文件(R = 1)。

QQ截圖20131116105326

圖2顯示了隨時間變化的計算進度。Y軸是輸入數據掃描的速率。這個速率逐漸提升代表更多的機器被分配給MapReduce計算,並在分配的機器數達到1764台時超過了30GB/s。隨着map任務的結束,這個速率開始下降,並在計算開始后80s時降至0。整個計算過程共花費約150s。這包括了大約1分鍾的啟動開銷。啟動開銷是由於要把程序傳播到所有工作機器上,還包括GFS要打開1000個輸入文件和獲取局部優化所需信息而導致的延遲。

5.3  排序

排序程序要對10100個100字節的記錄進行排序(差不多1TB數據)。這個程序模仿了TeraSort測試程序。

排序程序只包括不到50行的用戶代碼。map函數一共3行,它從一個文本行中提取出10字節的排序key,再將key和原始文本行輸出為中間key/value對。我們使用了一個內置的Identity函數作為reduce函數。這個函數會將未更改的中間key/value輸出為結果。最終的排序后輸出被出到一組2路復制的GFS文件(例如,這個程序的輸出要寫2TB的數據)。

如前所述,輸入數據被分成若干個64MB大小的部分(M = 15000)。我們將已排序的輸出分成4000個文件(R = 4000)。划分函數使用key的首字節來將它分到R個文件中的一個。

此次測試中我們的划分函數使用了key分布的內建知識。在一個一般的排序程序中,我們會預先加一輪MapReduce操作,收集key的樣本,並使用key抽樣的分布來計算最終輸出文件的划分點。

QQ截圖20131116105404

圖3(a)是排序程序的正常執行過程。上邊的圖顯示了輸入讀取的速度。輸入速度的峰值為13GB/s,並在200秒后所有的map任務都完成時迅速下降至0。可以注意到這個速度要比grep的速度小。這是因為排序的map任務要在向本地磁盤寫中間文件上花費大約一半的時間和I/O帶寬。而grep中相應的中間輸出則可以忽略不計。

中間的圖顯示了數據從map節點通過網絡向reduce節點發送的速度。這個移動開始於第一個map任務完成。圖中的第一個駝峰出現在reduce任務第一次達到1700個時(整個MapReduce共分配到1700台機器,每台機器同時最多只運行一個reduce任務)。計算開始差不多300秒時,第一批reduce任務已經有部分完成,我們開始向剩余的reduce任務傳送數據。所有傳送過程在計算開始600秒后結束。

下面的圖是reduce任務向最終的輸出文件寫入已排序數據的速度。在第一個傳輸周期的結束與寫入周期的開始之間有一個延遲,因為此時機器正在排序中間數據。寫入速度在2-4GB/s下持續了一段時間。所有的寫操作在計算開始后850秒左右結束。包括啟動開銷的話,整個計算花費了891秒。這與TeraSort上目前公布的最佳記錄1057秒很接近。

一些要注意的事:因為我們的局部性優化,輸入速度要比傳播速度和輸出速度都快——大多數數據都讀自本地磁盤,繞開了我們帶寬相當有限的網絡。傳播速度要比輸出速度高,因為輸出階段要寫兩份已排序的數據(因為可靠性和實用性的考慮)。我們寫兩份輸出是因為這是我們的底層文件系統針對可靠性和實用性提供的機制。如果底層文件系統使用擦除代碼而不是復制,那么寫數據需要的網絡帶寬就會減少。

5.4  備用任務的影響

在圖3(b)中顯示的是禁用了備用任務情況下排序程序的執行過程。這種情況下的執行過程與圖3(a)中的很相似,但在末尾處有很長一段時間幾乎沒有任何的寫操作發生。在960秒后,只有5個reduce任務還沒有完成。但這最后的幾個任務直到300秒后才結束。整個計算花費了1283秒,比正常情況多花費44%的時間。

5.5  機器失敗

在圖3(c)中顯示了有機器失敗情況下的排序程序執行過程。在計算開始幾分鍾后,我們有意的殺掉了1746個工作節點中的200個。底層集群調度器立即在這些機器上重啟了新的工作進程(只有進程被殺掉了,機器還在正常運行中)。

工作節點的死亡顯示為一個負的輸入速度,因為一些已經完成的map任務消失了(因為對應的map節點被殺掉了)需要重新完成。這些map任務的重執行很快就會發生。整個計算過程在933秒后結果,包括了啟動開銷(相比正常執行時間只增加了5%)。

6 經驗

我們在2003年2月寫出了MapReduce庫的第一個版本,並在2003年8月進行了非常大的改進,包括了局部性優化、各工作機器上任務執行的動態負載平衡等。從那時開始,我們愉快地驚訝於MapReduce被如此廣泛地應用在我們工作中遇到問題上。它已被用於Google的很多領域中,包括:

  • 大規模機器學習問題;

  • 針對Google News和Froogle產品的集群問題;

  • 用於產生流行需求報告的數據提取(例如Google Zeitgeist);

  • 針對新的嘗試和產品的網頁屬性提取(例如,從大量的定位搜索中提取出地理位置信息);

  • 大規模的圖計算。

QQ截圖20131116105431

圖4顯示了在我們的源代碼管理系統中進行了登記的MapReduce程序的數量隨時間的顯著增長,從2003年初的0到2004年9月底的接近900個不同版本。MapReduce取得如此成功的原因是它令花半小時時間寫出一個簡單的程序並運行在一千台機器上成為了可能,這極大地加速了開發和原型實現循環。進一步說,它允許沒有分布式與並行系統經驗的程序員也能輕松地利用大量的資源。

QQ截圖20131116105500

在每項作業的結尾,MapReduce庫會將作業花費的計算資源統計寫入日志。在表1中,我們能看到Google在2004年8月運行的MapReduce作業的一部分統計情況。

6.1  大規模索引

迄今為止我們的一個最重要的MapReduce應用是完全重寫了生產索引系統,它負責產生用於Google網絡搜索服務的數據結構。索引系統把由我們的爬取系統檢索的一個大的文檔集合作為輸入,並存儲為一組GFS文件。這些文檔的原始文本數據超過了20TB。索引進程分成了5-10個MapReduce操作運行。使用MapReduce(而不是之前版本使用的ad-hoc分布系統)提供了許多好處:

  • 索引代碼更簡單,更小,也更易懂,因為處理容錯性和分布與並行的代碼都被隱藏在MapReduce庫中了。例如,其中一個階段的計算在使用了MapReduce后從差不多3800行C++代碼降到了700行代碼。

  • MapReduce庫的性能足夠好,因此我們可以令概念上無關的計算過程相互分離,而不需要為了避免額外的數據處理而將它們混合在一起。這減小了修改索引進程的難度。例如,在我們的舊索引系統中進行一個修改要花費幾個月時間,而在新系統中只需要幾天時間。

  • 索引進程更容易去操作,因為大多數由機器失敗、部分機器緩慢和網絡暫時中斷引起的問題都由MapReduce庫自動處理了,不需要操作者干預。此外,通過向集群中增加機器,可以很容易地增強索引進程的性能。

7 相關工作

許多系統都提供了約束好的程序模型,並利用這些約束自動的進行並行計算。例如,一個組合函數可以在N個處理器上使用並行前綴計算用log N的時間計算出包含N個元素的數組的所有前綴。MapReduce可以被認為是基於來自我們在真實世界中的大型計算的經驗,對這些模型的簡單化和提煉。更重要的是,我們提供了可以擴展到上千個處理器上使用的容錯機制。作為對比,大多數並行處理系統只能在小規模下實現,並將處理機器錯誤的細節留給程序員去完成。

Bulk Synchronous Programming和一些MPI原語提供了更高層次的抽象,更容易寫出並行程序。這些系統和MapReduce的一個關鍵區別是MapReduce利用了一個受約束的程序模型去自動並行化用戶程序,並提供透明的容錯機制。

我們的局部性優化受到了諸如活動磁盤等技術的啟發,即將計算推給靠近本地磁盤的處理單元,來減少要跨過I/O子系統或網絡發送的數據總量。我們用運行在直接連接了少量磁盤的商用處理器上替代了直接運行在磁盤控制處理器上,但通用的方法是相似的。

我們的備用任務機制與Charlotte系統上的eager調度機制類似。簡單的eager調度系統有一個缺點,就是如果某個任務導致了多次失敗,整個計算都會失敗。我們通過自己的略過壞記錄的機制解決了一些這類問題。

MapReduce的實現依賴於一個內部的集群管理系統,它負責在一組數量很多的共享機器上分布運行用戶任務。盡管不是本文的重點,這個集群管理系統與Condor等其它系統在原理上是類似的。

排序機制作為MapReduce庫的一部分,與NOW-Sort操作是類似的。源機器(map節點)切分待排序的數據並將其發送給R個reduce節點。每個reduce節點在本地排序它的數據(盡量存放在內存中)。當然NOW-Sort沒有用戶定義的map和reduce函數,這些使我們的庫被廣泛應用。

River提供了一個編程模型,通過在分布式隊列上發送數據來處理通信。類似於MapReduce,River系統試圖提供提供好的平均情況性能,即使存在因不統一的硬件或系統擾動而導致的不一致性。River通過小心的調度磁盤和網絡傳輸來實現平衡的完成時間,從而達到這一目標。MapReduce使用了不同的方法。通過約束編程模型,MapReduce框架能夠將問題划分成大量細粒度的任務。這些任務被動態調度給可用的工作節點,因此更快的節點會處理更多的任務。這個約束的編程模型同樣允許我們在臨近作業結束時將剩余的任務重復調度執行,這顯著的減少了存在不一致時(例如有緩沖或停頓的機器)的完成時間。

BAD-FS有着與MapReduce非常不同的編程模型,而且不像MapReduce,它面向跨越廣域網的作業執行。但是,它與MapReduce有着兩個本質上的相同點:

  1. 兩個系統都使用了備用執行來恢復因錯誤而導致的數據丟失。

  2. 兩個系統都使用了局部感知調度來減少在擁堵的網絡連接上發送的數據量。

TACC被設計為一個高度可用的網絡服務的簡化結構系統。類似於MapReduce,它依賴於重執行機制來實現容錯性。

8 結論

MapReduce編程模型已經成功的用於Google的許多不同的用途中。我們將它的成功歸功於幾個原因。首先,該模型即使是對於沒有並行和分布式系統經驗的程序員也是很易於使用的,因為它隱藏了並行化、容錯機制、局部性優化、以及負載平衡的細節。其次,很多種類的問題都很容易表示成MapReduce計算。例如,MapReduce被用於為Google的網絡搜索服務的數據產生、排序、數據挖掘、機器學習、以及許多其它系統。第三,我們已經開發了一個MapReduce的實現,可以擴展到包含上千台機器的大型集群。該實現可以高效使用這些機器的資源,因此適合於Google遇到的很多大型計算問題。

我們從這項工作中學到了很多東西。首先,約束這個編程模型令並行和分布式計算,以及令這些計算可容錯,變得簡單了。其次,網絡帶寬是一種稀缺資源。我們系統中的很多優化都因此針對減少通過網絡發送的數據總量:局部性優化允許我們從本地磁盤讀,同時將中間文件寫入本地磁盤也節省了網絡帶寬。第三,備用執行可以用於減小緩慢的機器的影響,及應對機器失敗和數據丟失。

附錄A 詞頻統計源代碼

 

 

 

#include "mapreduce/mapreduce.h"

// User's map fuction
class WordCounter: public Mapper {
public:
	virtual void Map(const MapInput &input) {
		const string &text = input.value();
		const int n = text.size();
		for (int i = 0; i < n; ) {
			// Skip past leading whitespace
			while ((i < n) && isspace(text[i]))
				++i;
			
			// Find word end
			int start = i;
			while ((i < n) && !isspace(text[i]))
				++i;
				
			if (start < i)
				Emit(text.substr(start, i-start), "1");
		}
	}
};
REGISTER_MAPPER(WordCounter);

// User's reduce function
class Adder: public Reducer {
	virtual void Reduce(ReduceInput *input) {
		// Iterate over all entries with the
		// same key and add the values
		int64_t value = 0;
		while (!input->done()) {
			value += StringToInt(input->value());
			input->NextValue();
		}
		
		// Emit sum for input->key()
		Emit(IntToString(value));
	}
};
REGISTER_REDUCER(Adder);

int main(int argc, char **argv) {
	ParseCommandLineFlags(argc, argv);
	
	MapReduceSpecification spec;
	
	// Store list of input files into "spec"
	for (int i = 1; i < argc; ++i) {
		MapReduceInput *input = spec.add_input();
		input->set_format("text");
		input->set_filepattern(argv[i]);
		input->set_mapper_class("WordCounter");
	}
	
	// Specify the output files:
	//    /gfs/test/freq-00000-of-00100
	//    /gfs/test/freq-00001-of-00100
	//    ...
	MapReduceOutput *out = spec.output();
	out->set_filebase("/gfs/test/freq");
	out->set_num_tasks(100);
	out->set_format("text");
	out->set_reducer_class("Adder");
	
	// Optional: do partial sums within map
	// tasks to save network bandwidth
	out->set_combine_class("Adder");
	
	// Tuning parameters: use at most 2000
	// machines and 100MB of memory per task
	spec.set_machines(2000);
	spec.set_map_megabytes(100);
	spec.set_reduce_megabytes(100);
	
	// Now run it
	MapReduceResult result;
	if (!MapReduce(spec, &result))
		abort();
		
	// Done: 'result' structure contains info
	// about counters, time taken, number of
	// machines used, etc.
	
	return 0;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM