本文為senlie原創,轉載請保留此地址:http://www.cnblogs.com/senlie/
1.概要
很多計算在概念上很直觀,但由於輸入數據很大,為了能在合理的時間內完成,這些計算
必須分布在數以百計數以千計的機器上。例如處理爬取得到的文檔、網頁請求日志來計算
各種衍生數據,如倒排索引,網頁文檔的各種圖結構表示,從每個主機上爬取的文檔數,
在某一天最頻繁的查詢的集合。
MapReduce 是為處理和生成大數據集的編程模式和相應的實現。
用戶指定一個 map 函數來處理一個鍵值對來生成一個鍵值對的集合,
和一個 reduce 函數來合並具有相同中間鍵的實值。
例如,有大一堆文檔,要統計里面每一個文檔的出現的次數。可以這樣寫map 函數和 reduce 函數
map(String key, String value): //key: document name //value: document contents for each word w in value: EmitIntermediate(w, '1'); reduce(String key, Iterator values): //key: a word //values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
??疑問:map 返回的是一個 key/value ,為什么到了 resuce 這的輸入卻變成了 key/list of values ,這中間
發生了什么?
解答:
map 函數接受一個鍵值對(如上面例子中的文檔名/文檔內容)並產生一組鍵值對(單詞/1)。在將這組
鍵值對傳給 reduce 函數之前, MapReduce 庫會組合所有具有相同鍵值的實值產生新的一組鍵/值(單詞/次數)。
reduce 函數接受來自多個 map 函數產生的鍵值對,它們在被 reduce 函數處理前,會先被 MapReduce 庫組合成
鍵/值列表(單詞/次數列表)。下圖解釋了這一過程。
(聲明:圖來自實驗室 adonis 同學的 seminar 展示ppt)
2.MapReduce 的執行的大概流程
通過將輸入數據划分為 M 個分片, map 函數的調用分布在多台機器上,這些分片可同
不同的機器並行地處理。
通過將中間結果的鍵空間划分為 R 個分片, reduce 函數的調用分布在多台機器上。
下圖展示了 MapReduce 操作的整個流程。
1). 客戶程序中的 MapReduce 庫首先將輸入文件分成 M 個大小通常為 16MB 或者64MB 的分片。
然后開始在集群上的機器復制客戶程序
2).其中有一個程序的備份是特殊的,它就是主節點。其它的是由主節點分配任務的從節點。
主節點有 M 個 map 任務和 R 個 reduce 任務要分配給那些空閑的從節點。
3).一個被分配了 map 任務的從節點從輸入分片中讀取內容,然后從輸入中解析出鍵值對被傳遞給
用戶定義的 map 函數,由它來產生中間結果的鍵值對並緩存在內存中
4).在內存中的鍵值對被周期性地寫入到本地磁盤,通過分片函數被分成 R 個分片。
這些分片的位置被回傳給主節點,由主節點告訴 reduce 從節點它們的位置
5).當 reduce 從節點被主節點告知分片的位置時,它從使用 RPC(remote procedure call) 去讀取
那些緩存數據,當讀完后,它會按鍵值進行排序,然后將有相同鍵值的鍵值對組合在一起,形成鍵/值列表
6).reduce 從節點遍歷已經排序合並好了的中間數據,將每一個鍵/值列表對傳遞給客戶定義的 reduce 函數。
reduce 函數返回的結果被添加到這個 reduce 從節點的結果文件中。
7).當所有 map 從節點和 reduce 從節點完成后,主節點喚醒客戶程序。
如果 MapReduce 程序成功完成,結果文件被存儲在 R 個輸出文件中。
3.示例
這個示例統計了一組輸入文件里每個單詞的出現次數
#include "mapreduce/mapreduce.h" //user's map function class WordCounter : public Mapper{ public: virtual void Map(const MapInput &input){ const string &text = input.value(); const int n = text.size(); for(int i = 0; i < n; ){ //忽略單詞前空格 while(i < n && isspace(text[i])) i++; //找到單詞的結尾 int start = i; while(i < n && !isspace(text[i])) i++; if(start < i) Emit(text.substr(start, i - start), "1"); } } }; REGISTER_MAPPER(WordCounter); // 這個是干嘛用的?? //User's reduce function class Adder : public Reducer { // 這里不用加個 public 的關鍵字? virtual void Reduce(ReduceInput *input){ //把有相同鍵值的數值加起來 int64 value = 0; while(!input->done()){ value != StringToInt(input->value()); input->NextValue(); } Emit(IntToString(value)); } } REGISTER_REDUCER(Adder); int main(int argc, char **argv){ ParseCommandLineFlags(argc, argv); MapReduceSpecification spec; //把輸入文件列表存入 "spec" for(int i = 1; i < argc; i++){ MapReduceInput *input = spec.add_input(); input->set_format("text"); input->set_filepattern(argv[i]); input->set_mapper_class("WordCounter"); } //指定輸出文件 MapReduceOutput *out = spec.output(); out->set_filebase("gfs/test/freq"); out->set_num_tasks(100); out->set_format("text"); out->set_reducer_class("Adder"); //可選:在 map 節點中做部分和運算以節省帶寬 out->set_combiner_class("Adder"); //調節參數:使用最多2000台機器,每個任務最多100MB內存 spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); //開跑 MapReduceResult result; if(!MapReduce(spec, &result)) abort(); //失敗的時候 abort, 能運行在這里就是成功了。 return 0; }
參考:
MapReduce: Simplified Data Processing on Large Clusters