Abstract
MapReduce是一種編程模型和一種用來處理和產生大數據集的相關實現。用戶定義map函數來處理key/value鍵值對來產生一系列的中間的key/value鍵值對。還要定義一個reduce函數用來合並有着相同中間key值的中間value。許多現實世界中的任務都可以用這種模型來表達,就像下文所展示的那樣。
用這個風格編寫的程序可以自動並行地在集群上工作。運行時系統會自動處理例如切割輸入數據,在機器之間調度程序的執行,處理機器故障以及管理必要的機器間通信等細節問題。這可以讓那些對於並行分布式系統沒有任何經驗的程序員也能很簡單地利用起一個大的分布式系統的資源。
我們的MapReduce的實現運行在一個由大的商業機構成的集群當中並且是高度可擴展的:一個典型的MapReduce計算要在上千台機器中處理TB數量級的數據。程序員會覺得這個系統非常好用:已經有成千上萬的MapReduce程序被實現出來並且每天有上千個MapReduce任務運行在Google的集群上。
1 Introduction
在過去五年中,作者和許多Google的其他人已經實現了成百上千個用於特殊目的的計算程序用於處理大量的raw data,各種各樣的derived data。許多這種計算程序在概念上都是非常直接的。然而輸入的數據量往往很大,並且計算需要分布在成百上千台機器中為了在一個可接受的時間內完成任務。但是除了簡單的計算模型以外,我們需要大量復雜的代碼用來處理例如如何並行化計算、分發數據、處理故障等等問題。
為了解決這樣的復雜性,我們設計了一種新的抽象,它讓我們只需要表示出我們想要執行的計算模型,而將背后復雜的並行化,容錯,數據分發,負載平衡等等技術的實現細節隱藏在了庫中。我們這種新的抽象是受Lisp以及其他一些函數式編程語言中的map和reduce原語影響而來的。我們意識到許多的計算都需要對於輸入中的每個邏輯“記錄”進行map操作,為了計算一系列的中間鍵值對。然后還需要對所有共享同一個key的value進行reduce操作,從而能夠對派生的數據進行適當的組合。我們這種讓用戶自定義map和reduce操作的編程模型能夠讓我們簡單地對大量數據實現並行化,並且使用重新執行作為主要的容錯機制。
我們這項工作的主要共享是提供了一個簡單並且強大的接口能夠讓我們實現自動的並行化並且分布處理大規模的計算,同時該接口的實現能在大型的商用PC集群上獲得非常高的性能。
Section 2描述了基本的編程模型以及一些簡單的例子。Section 3描述了為我們的基於集群的計算環境量身定做的MapReduce接口。Section 4描述了一些我們認為有用的對於編程模型的改進。Section 5是對我們的實現在不同任務下的性能測試。Section 6 包含了MapReduce在Google內的使用情況,包括我們以它為基礎重寫我們的產品索引系統的經驗。Section 7討論了相關的工作以及未來的發展。
2 Programming Model
計算模型以一系列的鍵值對作為輸入並產生一系列的鍵值對作為輸出。MapReduce庫的用戶以“Map”和"Reduce"兩個函數來表達計算。
Map,是由用戶編寫的,獲取一個輸入對,並且產生一系列中間的鍵值對。MapReduce庫將那些具有相同的中間鍵I的中間值聚集在一起,然后將它們傳遞給Reduce函數。
Reduce函數同樣是由用戶編寫的,接收一個中間鍵I和該鍵對應的一系列的中間值。Reduce函數通過將這些值合並來組成一個更小的值的集合。通常每個Reduce函數只產生0個或1個輸出值。Reduce函數一般通過一個迭代器來獲取中間值,從而在中間值的數目遠遠大於內存容量時,我們也能夠處理。
2.1 Example
下面來考慮這樣一個問題:統計大量文檔中每一個單詞出現的次數。對此,用戶需要編寫類似於如下的偽代碼:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函數為在每一個單詞出現的時候,為它加上一個計數(在這個簡單的例子中就是加1)。Reduce函數對每個單詞的所有計數進行疊加。
另外,用戶需要用輸入輸出文件的名字,以及一個可選的tuning parameter去填充一個叫mapreduce specification 的對象。之后,用戶調用MapReduce函數,將定義的上述對象傳遞進去。用戶的代碼將和MapReduce庫相連(由C++實現)。Appendix A中有這個例子所有的代碼文檔。
2.2 Types
雖然在上述的偽代碼中輸入輸出都是字符串類型的,但事實上,用戶提供的Map和Reduce函數都是有相應類型的:
map (k1, v1) -> list(k2, v2)
reduce (k2, list(v2)) -> list(v2)
需要注意的是,輸入的key和value與輸出的key和value是不同的類型,而中間的key和value與輸出的key和value是相同的類型。我們的C++實現都是以字符串的形式和用戶代碼進行交互的,至於將字符串類型轉換成相應合適的類型的工作則由用戶代碼來完成了。
2.3 More Example
接下來是一些能夠簡單地用MapReduce計算模型進行表達的例子
Distributed Grep:Map函數獲取匹配提供的模式的行,Reduce函數只是簡單地將這些中間數據拷貝到輸出
Count of URL Access Frequency:Map函數處理web請求的日志,並且輸出<URL, 1>。Reduce函數將擁有相同URL的value相加,得到<URL, total count>對
Reverse Web-Link Graph:Map函數輸出<target, source>對,其中source所在的page都有連向target這個URL的鏈接。Reduce函數將給定target的所有的source URL連接起來,輸出<target, list(source)>對
Term-Vector per Host:一個term vector表示一系列<word, frequency>的鍵值對,word表示一篇或者一系列文章中出現的比較重要的單詞,frequency表示它們出現的次數。Map函數對於每篇輸入的文章輸出<hostname, term vector>鍵值對(其中hostname是從文章所在的URL中抽取出來的)Reduce函數獲取給定host的term vectors。它將這些term vectors累加起來,丟棄非頻繁出現的term,並產生一個最終的<hostname, term vector>對。
Inverted Index:Map函數對每篇文章進行處理,並輸出一系列的<word, document ID>對。Reduce函數接收給定word的所有鍵值對,對相應的document ID進行排序並且輸出<word, list<document ID>>對。所有輸出對的集合構成了一個簡單的倒排索引。用了MapReduce模型,對單詞位置的追蹤就變得非常簡單了。
Distributed Sort:Map函數從每個record中抽取出key,產生<key, record>鍵值對。Reduce函數只是簡單地將所有對輸出。這個計算模型依賴於Section 4.1中描述的划分技巧以及Section 4.2中描述的排序特性。
3 Implementation
對於MapReduce的接口,各種各樣不同的實現都是可能的。所有正確的實現都是基於應用環境的。比如,一種實現可能適合於小的共享內存的機器,另一種可能適合於大型的NUMA多處理器機器,甚至有的是為更大的互聯的機器集群設計的。
本節中描述的實現基於的是Google中最常用的計算環境:一個由大量商用PC機通過交換以太網互聯的集群。在我們的環境中:
(1)、機器通常都是x86的雙核處理器,其上運行Linux,每台機器擁有2-4G的內存
(2)、商用網絡硬件---通常是100 M/s或者1 G/s,但是綜合起來要小於平均帶寬
(3)、一個集群由成千上萬台機器組成,因此機器故障是常有的事
(4)、存儲由便宜的IDE磁盤提供,它們都與獨立的機器直接相連。一個內部研發的文件系統用於管理所有存儲於這些硬盤上的文件。該文件系統通過Replication在不可靠的硬件上提供了可用性和可靠性
(5)、用戶提交jobs給調度系統。每個job由一系列的task組成,並且由調度器分配到集群中一系列可用的機器上
3.1 Execution Overview
通過將輸入數據自動分割成M份,Map函數得以在多台機器上分布式執行。每一個輸入塊都能並行地在不同的機器上執行。通過划分函數(例如,hash(key) mod R)將中間鍵划分為R份,Reduce函數也能被分布式地調用。其中划分的數目R和划分函數都是由用戶指定的。
上圖1展示了在我們的實現中MapReduce全部的流程。當用戶程序調用MapReduce函數時,接下來的動作將按序發生(圖1中標記的數字與下面的數字是一一對應的):
(1)、用戶程序中的MapReduce庫首先將輸入文件划分為M片,每片大小一般在16M到64M之間(由用戶通過一個可選的參數指定)。之后,它在集群的很多台機器上都啟動了相同的程序拷貝。
(2)其中有一個拷貝程序是特別的----master。剩下的都是worker,它們接收master分配的任務。其中有M個Map任務和R個Reduce任務要分配。master挑選一個空閑的worker並且給它分配一個map任務或者reduce任務。
(3)、被分配到Map任務的worker會去讀取相應的輸入塊的內容。它從輸入文件中解析出鍵值對並且將每個鍵值對傳送給用戶定義的Map函數。而由Map函數產生的中間鍵值對緩存在內存中。
(4)、被緩存的鍵值對會階段性地寫回本地磁盤,並且被划分函數分割成R份。這些緩存對在磁盤上的位置會被回傳給master,master再負責將這些位置轉發給Reduce worker。
(5)、當Reduce worker從master那里接收到這些位置信息時,它會使用遠程過程調用從Map worker的本地磁盤中獲取緩存的數據。當Reduce worker讀入全部的中間數據之后,它會根據中間鍵對它們進行排序,這樣所有具有相同鍵的鍵值對就都聚集在一起了。排序是必須的,因為會有許多不同的鍵被映射到同一個reduce task中。如果中間數據的數量太大,以至於不能夠裝入內存的話,還需要另外的排序。
(6)、Reduce worker遍歷已經排完序的中間數據。每當遇到一個新的中間鍵,它會將key和相應的中間值傳遞給用戶定義的Reduce函數。Reduce函數的輸出會被添加到這個Reduce部分的輸出文件中。
(7)、當所有的Map tasks和Reduce tasks都已經完成的時候,master將喚醒用戶程序。到此為止,用戶代碼中的MapReduce調用返回。
當成功執行完之后,MapReduce的執行結果被存放在R個輸出文件中(每個Reduce task對應一個,文件名由用戶指定)。通常用戶並不需要將R個輸出文件歸並成一個。因為它們通常將這些文件作為另一個MapReduce調用的輸入,或者將它們用於另外一個能夠以多個文件作為輸入的分布式應用。
3.2 Master Data Structures
在master中保存了許多的數據結構。對於每個Map task和Reduce task,master都保存了它們的狀態(idle,in-progress或者是completed)以及worker所在機器的標識(對於非idle狀態的tasks而言)。
master相當於是一個管道,通過它Map task所產生的中間文件被傳遞給了Reduce task。因此,對於每一個已經完成的Map task,master會存儲由它產生的R個中間文件的位置和大小。當Map task完成的時候,master就會收到位置和大小的更新信息。而這些信息接下來就會逐漸被推送到處於in-progress狀態的Reduce task中。
3.3 Fault Tolerance
因為MapReduce庫的設計初衷是用成千上萬的機器去處理大量的數據,所以它就必須能用優雅的方式對機器故障進行處理。
Worker Failure
master會周期性地ping每一個worker。如果經過了一個特定的時間還未從某一個worker上獲得響應,那么master會將worker標記為failed。所有由該worker完成的Map task都被回退為idle狀態,因此能夠被重新調度到其他的worker上。同樣的,所有failed worker正在執行的Map task或者Reduce task也會被回退為idle狀態,並且被重新調度。
發生故障的機器上已經完成的Map task需要重新執行的原因是,它們的輸入是保存在本地磁盤的,因此發生故障之后就不能獲取了。而已經完成的Reduce task並不需要被重新執行,因為它們的輸出是存放在全局的文件系統中的。
當一個Map task開始由worker A執行,后來又由worker B執行(因為A故障了)。所有執行Reduce task的worker都會收到這個重新執行的通知。那些還未從worker A中讀取數據的Reduce task將會從worker B中讀取數據。
MapReduce對於大面積的機器故障是非常具有彈性的。例如,在一次MapReduce操作中,網絡維護造成了集群中八十台機器在幾分鍾的時間內處於不可達的狀態。MapReduce的master只是簡單地將不可達的worker機器上的工作重新執行了一遍,接着再繼續往下執行,最終完成了MapReduce的操作。
Master Failure
對於master,我們可以簡單地對上文所述的master數據結構做周期性的快照。如果一個master task死了,我們可以很快地根據最新的快照來重新啟動一個master task。但是,因為我們只有一個master,因此故障的概率比較低。所以,在我們的實現中如果master出現了故障就只是簡單地停止MapReduce操作。用戶可以檢測到這種情況,並且如果他們需要的話可以重新開始一次MapReduce操作。
Semantics in the Presence of Failures
如果用戶提供的Map和Reduce操作是關於輸入值的確定性函數,那么我們分布式的實現將會產生同樣的輸出,在整個程序經過沒有出現故障的順序執行之后。
我們依賴Map task和Reduce task原子性地提交輸出來實現上述特性。每一個正在執行的task都會將它的輸出寫到一個私有的臨時文件中。一個Reduce task產生一個這樣的文件,而一個Map task產生R個這樣的文件(每個Reduce work一個)。當一個Map task完成的時候,worker就會給master發送一個信息,,其中包含了R個臨時文件的名字。如果master收到了一個來自於已經完成了的Map task的完成信息,那么它就將它自動忽略。否則,將R個文件的名稱記錄到一個master數據結構中。
當一個Reduce task完成的時候,Reduce worker會自動將臨時輸出文件命名為最終輸出文件。如果同一個Reduce task在多台機器上運行,那么多個重命名操作產生的最終輸出文件名將會產生沖突。對此,我們依賴底層文件系統提供的原子重命名操作來保證最終文件系統中的數據來自一個Reduce task。
大多數的Map和Reduce操作都是確定性的,事實上,我們的語義等同於順序執行。因此這讓程序員非常容易地能夠解釋他們程序的行為。當Map和Reduce操作是非確定性的時候,我們提供較弱,但仍然合理的語義。在非確定性的操作中,對於一個特定的Reduce task R1的輸出是和非確定性程序順序執行產生R1產生的輸出是相同的。然而,對於另一個Reduce task R2,它的輸出對應於非確定性程序另一個順序執行的結果。
下面考慮Map task M和Reduce task R1和R2。讓e(Ri)表示Ri的執行結果。更弱的語義意味着,e(R1)可能從M的一次執行結果中讀取輸入,而e(R2)可能從M的另一次執行中讀取輸入。
3.4 Locality
網絡帶寬在我們的計算環境中是相對稀缺的資源。我們通過將輸入數據存儲在集群中每台機器的本地磁盤的方法來節省帶寬。GFS將輸入文件切分成64MB大小的塊,並且將每個塊的多份拷貝(通常為3份)存儲在不同的機器上。MapReduce的master獲取所有輸入文件的位置信息,然后將Map task調度到有相應輸入文件副本的機器上。當發生故障時,再將Map task調度到鄰近的具有該task輸入文件副本的機器(即在同一台交換機內具有相同數據的機器)。當在一個集群的大量機器上做MapReduce操作時,大多數的輸入數據都是從本地讀取的,而不用消耗帶寬。
3.5 Task Granularity
如上所述,我們將Map操作分成M份,Reduce操作分成R份。在理想的情況下,M和R的值應該要比集群中worker machine的數量多得多。讓一個worker同時進行許多不同的task有利於提高動態的負載均衡,同時在一個worker故障的時候能盡快恢復。許多已經完成的Map task也能盡快地傳播到其他所有的worker machine上。
在我們的實現中,M和R的大小是有一個實用范圍的。因為我們的master需要做O(M+R)個調度決定,並且還要在內存中保存O(M*R)個狀態。(但是內存使用的常數還是比較小的,O(M*R)個Map task/Reduce task 狀態對,每個的大小大概在一個字節)
另外,R通常受限於用戶,因為每個Reduce task的輸出都分散在不同的輸出文件中。事實上,我們會選擇M,因此每個輸入文件大概16MB到64MB的輸入文件(因此上文所述的局部性優化會達到最優)。而我們會讓R成為worker machine數量的一個較小的倍數。因此,我們通常在進行MapReduce操作時,將M設為200000,R設為5000,使用2000個worker machine。
3.6 Backup Tasks
“straggler”(落伍的士兵)的存在是拖慢整個MapReduce操作的通常的原因之一。所謂的"straggler"是指一台機器用了過長的時間去完成整個計算任務中最后幾個Map或者Reduce task。Straggler出現的原因有很多。比如一台機器上硬盤壞了,它就會經歷大量的可糾正錯誤,從而讓它的性能從30MB/s下降到1MB/s。集群的調度系統可能將其他task調度到該機器上,導致它執行MapReduce代碼的速度變慢很多,因為CPU,內存,本地磁盤,網絡帶寬的競爭加劇。我們最近遇到的一個問題是一台機器的初始化代碼有點問題,它會導致處理器的緩存被禁用,在這些受影響的機器上進行的計算速度會下降到原來的百分之一。
對此,我們有一個通用的機制用來緩解straggler的問題。當MapReduce操作接近結束的時候,master會將那些還在執行的task的備份進行調度執行。無論是原來的還是備份執行完成,該task都被標記為已完成。我們通過調整將該操作導致的計算資源消耗僅僅提高了幾個百分點。但是在完成大型的MapReduce操作時,卻讓整個執行時間下降了好多。例如,Section 5.3中所描述的排序算法在備份機制關閉的情況下,需要多消耗44%的時間。
4 Refinement
雖然對於大多數需求由Map和Reduce函數提供的功能已經足夠了,但是我們還是發現了一些有用的擴展。對它們的描述如下。
4.1 Partitioning Function
MapReduce用戶決定他們的Reduce task或者輸出文件的數目R。通過一個划分函數,根據中間鍵值將各個task的數據進行划分。默認的划分函數是通過哈希(比如,hash(key) mod R)。這通常會產生非常好的較為均衡的划分。但是在其他一些情況下,通過鍵值的其他函數來划分要更好一些。例如,有的時候輸出鍵值是一些URL,我們希望同一個host的內容能放在同一個輸出文件中。為了支持這種情況,MapReduce庫的用戶可以提供一個特殊的划分函數。例如,使用“hash(Hostname(urlKey)) mod R”作為划分函數,從而讓所有來自於同一個host的URL的內容都輸出到同一個輸出文件。
4.2 Ordering Guarantees
我們確保在一個給定的划分中,中間的鍵值對都按照鍵值的升序進行處理。這樣的處理順序確保了每一個划分產生一個排好序的輸出文件。這樣的話,如果輸出文件格式需要支持根據key進行有效的隨機查找會比較方便。同時,輸出的用戶也會覺得已經排好序的數據使用起來特別方便。
4.3 Combiner Function
在有些情況下,每個Map task都會產生大量的中間鍵的重復而用戶指定的Reduce函數是交互和關聯的。Section 2.1中的單詞統計就是一個很好的例子。因為單詞的出現頻率服從於Zipf分布,每個Map Task都會產生成百上千個<the, 1>這樣的記錄。所有這些記錄都會通過網絡被送到一個Reduce task中,並且由Reduce函數加在一起去產生一個數。我們允許用戶使用了可選的Cominer函數,用於在網絡傳輸之前部分地進行歸並操作。
Combiner函數在每個執行Map task的機器上執行。通常Combiner和Reduce函數使用的是相同的代碼。Reduce函數和Combiner函數唯一的不同是MapReduce庫如何處理函數的輸出。Reduce函數的輸出寫到最終的輸出文件中。而Combiner函數的輸出會被寫到一個最終將被送給Reduce task的中間文件中。
部分的合並操作能極大地加速某類特定的MapReduce操作。Appendix A包含了一個使用Combiner的例子。
4.4 Input and Output Types
MapReduce庫提供了對讀入數據文件多種的格式支持。例如,"text"格式的輸入將每一行作為鍵值對:key是文件內的偏移,value是該行的內容。另外一種比較常用的格式存儲一系列按照鍵進行排序的鍵值對。每一個輸出格式的實現都知道如何將自己進行合理的划分從而能讓不同的Map task進行處理(例如,text模式就知道將區域划分到以行為邊界)。用戶可以通過簡單地定義一個reader接口來提供一個新的輸入類型的實現。事實上,大多數用戶只使用了預定義輸入類型的很小一部分。
reader並不一定要從文件中讀取數據。例如,我們可以很容易地定義一個從數據庫,或者內存中映射的數據結構中讀取記錄的reader。
同理,我們也支持產生不同格式的輸出數據,用戶也能編寫新的輸出數據格式。
4.5 Side-effects
在有些情況下,MapReduce的用戶會很容易發現Map或者Reduce操作會產生一些輔助文件作為額外的輸出文件。我們依賴應用的編寫者去保證這些副作用是原子和冪等的。一般來說,應用會寫到一個臨時文件中,並且在它完全產生之后,通過一個原子操作將它重命名。
對於一個單一的task產生的多個輸出文件,我們不提供原子性的兩相提交支持。因此,產生多個輸出文件並且有跨文件一致性要求的task需要是確定性的。但是這樣的限制在實踐過程中並不是什么問題。
4.5 Skipping Bad Records
有時候,如果用戶的代碼中有bug的話,會導致Map或者Reduce操作在某些記錄上崩潰。這些bug會導致MapReduce操作的正常完成。對於這種情況,通常就是去修bug。不過有時候這是不可行的,也許bug是第三方庫造成的,而我們並不能得到它的源代碼。而且,有時候我們允許忽略掉一些記錄,例如在對一個大數據集做分析的時候。因此我們提供了一種可選的執行模式,當MapReduce庫檢測到一些記錄會造成崩潰時,就會主動跳過它們,從而保證正常地運行。
每一個worker進程都安裝了一個signal handler用於捕捉段錯誤和bug。在調用用戶的Map和Reduce操作之前,MapReduce庫會將參數的序號保存在一個全局變量中。如果用戶代碼產生了一個信號,signal handler就會傳輸一個參數含有序號的"last gasp"UDP包給MapReduce的master。當master在一個特定的記錄中發現了不知一次的錯誤,這表示在下一次執行相應的Map或者Reduce操作的時候一個將它跳過。
4.7 Local Execution
Map或者Reduce函數的調試問題是非常tricky的。因為實際的計算發生在分布式的系統中,通常由成百上千台機器組成,並且工作的分配由master動態執行。為了幫助調試,分析,以及小規模的測試,我們開發了另外一個MapReduce庫的實現,它能夠在本地機器上順序執行一個MapReduce操作的所有工作。它的控制交給用戶,因此計算可以被限定到制定的Map task中執行。用戶利用指定的flag啟動程序,然后就能非常簡單地使用任何它們覺得有用的調試或者測試工具了。
4.8 Status Information
master運行了一個內置的HTTP server並且暴露了一系列供人類使用的狀態頁。狀態頁會顯示程序的計算過程,例如已經完成了多少個task,還有多少個task正在執行,輸入的字節數,中間數據的字節數,輸出的字節數,以及處理速度等等。該頁還包含了指向各個task的標准錯誤和標准輸出鏈接。用戶可以利用這些數據來判斷計算會持續多長時間,以及計算是否需要添加更多的資源。這些頁面還能用來發現什么時候處理速度比預期地下降好多。
另外,頂層的狀態頁顯示了那些worker出錯了,以及在它們出錯時正在執行哪些Map和Reduce task。這些信息在診斷用戶代碼出現的bug時是非常有用的。
4.9 Counter
MapReduce庫提供了一個叫counter的設施用於統計各種不同事件出現的次數。例如,用戶可能想要統計已經處理過的單詞的數目或者德國文件的索引數量。
為了使用這一特性,用戶代碼創建一個命名的counter對象,並且在Map以及Reduce函數中對counter進行增加。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase")
map(String name, String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
每個worker機器上counter的值會定期傳給master(捎帶在給master的ping回復中)。master將來自成功執行的Map和Reduce task的counter值聚集起來。然后在MapReduce操作完成之后返回給用戶代碼。當前的counter值也會顯示在master的狀態頁上,所以用戶能從實時觀看計算的進行。在聚集counter的值的時候,master會消除Map或者Reduce task的重復執行造成的重復計算。(重復執行可能由backup tasks或者因為錯誤重新執行的task引起)。
有些counter的值是由MapReduce庫自動維護的,例如已經處理的輸入鍵值對數目以及已經產生的輸出鍵值對數目。
用戶發現counter特性對於檢查MapReduce操作的執行是非常有用的。例如,在有些MapReduce操作中,用戶代碼想要確保產生的輸出對的數目和已經處理的輸入對的數目是恰好相等的,或者處理的德語文件的數目占總處理文件數目的比重在一個可容忍的范圍內。
5 Performance
在這個section中,我們通過運行在一個集群上的兩個computation來測試MapReduce的性能。一個Computation搜索一個T的數據,從中獲取一個特定的模式。另一個computation對一個T的數據進行排序。
這兩個程序代表了由用戶實際編寫的MapReduce程序的一個子集------一類程序用於將數據從一種表示方法切換到另一種表示方法。另一類程序則從大數據集中抽取出一小部分有趣的數據。
5.1 Cluster Configuration
所有程序都運行在一個由1800台機器組成的機器上。每一台機器都有兩個2GHz 的Intel Xeon處理器,並且Hyper-Threading打開, 4GB內存,兩個160GB的IDE磁盤,以及一個G的以太網鏈路。這些機器被安排在一個兩層樹狀的交換網絡中,根節點的帶寬大概在100-200Gbps。因為所有機器都在同一個托管設備中,因此任意兩台機器見的通信時間少於1ms。
其中4GB中的1-1.5G是為集群中運行的其他任務預留的。程序在一個周末的下午運行,此時CPU,磁盤,網絡基本都處於空閑狀態。
5.2 Grep
grep程序需要掃描10的十次方條100-byte的記錄,搜索一個相對罕見的三字符模式(出現了92337次)。輸入被分成大概64MB份(M = 15000),所有的輸出文件都存放在一個文件中(R = 1)。
Figure 2顯示了Computation隨着時間的變化過程。Y軸代表了輸入數據的掃描速度。隨着機器逐漸加入MapReduce的計算當中,速度越來越快,當有1764個worker加入時,達到峰值30GB/s。隨着Map task的結束,速度開始下降並且在80s的時候到達0,。整個Computation從開始到結束總共花費了大概150s。這其中還包括了1分鍾的啟動開銷。開銷主要來源於將程序分發到worker machine中,和GFS交互並打開1000個輸入文件,以及獲取局部性優化所需的信息的延時。
5.3 Sort
排序程序用於對10的十次方條記錄(大概1T的數據)進行排序。程序以TeraSort benchmark為模型。