大數據技術 —— MapReduce 簡介


本文為senlie原創,轉載請保留此地址:http://www.cnblogs.com/senlie/

1.概要
很多計算在概念上很直觀,但由於輸入數據很大,為了能在合理的時間內完成,這些計算
必須分布在數以百計數以千計的機器上。例如處理爬取得到的文檔、網頁請求日志來計算
各種衍生數據,如倒排索引,網頁文檔的各種圖結構表示,從每個主機上爬取的文檔數,
在某一天最頻繁的查詢的集合。

MapReduce 是為處理和生成大數據集的編程模式和相應的實現。
用戶指定一個 map 函數來處理一個鍵值對來生成一個鍵值對的集合,
和一個 reduce 函數來合並具有相同中間鍵的實值。

例如,有大一堆文檔,要統計里面每一個文檔的出現的次數。可以這樣寫map 函數和 reduce 函數

map(String key, String value):
	//key: document name
	//value: document contents
	for each word w in value:
		EmitIntermediate(w, '1');
reduce(String key, Iterator values):
	//key: a word
	//values: a list of counts
	int result = 0;
	for each v in values:
		result += ParseInt(v);
	Emit(AsString(result));

  

??疑問:map 返回的是一個 key/value ,為什么到了 resuce 這的輸入卻變成了 key/list of values ,這中間
發生了什么?
解答:
map 函數接受一個鍵值對(如上面例子中的文檔名/文檔內容)並產生一組鍵值對(單詞/1)。在將這組
鍵值對傳給 reduce 函數之前, MapReduce 庫會組合所有具有相同鍵值的實值產生新的一組鍵/值(單詞/次數)。
reduce 函數接受來自多個 map 函數產生的鍵值對,它們在被 reduce 函數處理前,會先被 MapReduce 庫組合成
鍵/值列表(單詞/次數列表)。下圖解釋了這一過程。
(聲明:圖來自實驗室 adonis 同學的 seminar 展示ppt)

2.MapReduce 的執行的大概流程
通過將輸入數據划分為 M 個分片, map 函數的調用分布在多台機器上,這些分片可同
不同的機器並行地處理。
通過將中間結果的鍵空間划分為 R 個分片, reduce 函數的調用分布在多台機器上。
下圖展示了 MapReduce 操作的整個流程。

1). 客戶程序中的 MapReduce 庫首先將輸入文件分成 M 個大小通常為 16MB 或者64MB 的分片。
然后開始在集群上的機器復制客戶程序
2).其中有一個程序的備份是特殊的,它就是主節點。其它的是由主節點分配任務的從節點。
主節點有 M 個 map 任務和 R 個 reduce 任務要分配給那些空閑的從節點。
3).一個被分配了 map 任務的從節點從輸入分片中讀取內容,然后從輸入中解析出鍵值對被傳遞給
用戶定義的 map 函數,由它來產生中間結果的鍵值對並緩存在內存中
4).在內存中的鍵值對被周期性地寫入到本地磁盤,通過分片函數被分成 R 個分片。
這些分片的位置被回傳給主節點,由主節點告訴 reduce 從節點它們的位置
5).當 reduce 從節點被主節點告知分片的位置時,它從使用 RPC(remote procedure call) 去讀取
那些緩存數據,當讀完后,它會按鍵值進行排序,然后將有相同鍵值的鍵值對組合在一起,形成鍵/值列表
6).reduce 從節點遍歷已經排序合並好了的中間數據,將每一個鍵/值列表對傳遞給客戶定義的 reduce 函數。
reduce 函數返回的結果被添加到這個 reduce 從節點的結果文件中。
7).當所有 map 從節點和 reduce 從節點完成后,主節點喚醒客戶程序。
如果 MapReduce 程序成功完成,結果文件被存儲在 R 個輸出文件中。

3.示例
這個示例統計了一組輸入文件里每個單詞的出現次數

#include "mapreduce/mapreduce.h"
//user's map function
class WordCounter : public Mapper{
public:
	virtual void Map(const MapInput &input){
		const string &text = input.value();
		const int n = text.size();
		for(int i = 0; i < n; ){
			//忽略單詞前空格
			while(i < n && isspace(text[i])) i++;
			//找到單詞的結尾
			int start = i;
			while(i < n && !isspace(text[i])) i++;
			if(start < i) Emit(text.substr(start, i - start), "1");
			
		}
	}
};
REGISTER_MAPPER(WordCounter); // 這個是干嘛用的??

//User's  reduce function 
class Adder : public Reducer {
	// 這里不用加個 public 的關鍵字?
	virtual void Reduce(ReduceInput *input){
		//把有相同鍵值的數值加起來
		int64 value = 0;
		while(!input->done()){
			value != StringToInt(input->value());
			input->NextValue();
		}
		Emit(IntToString(value));	
	}
}
REGISTER_REDUCER(Adder);

int main(int argc, char **argv){
	ParseCommandLineFlags(argc, argv);
	MapReduceSpecification spec;
	
	//把輸入文件列表存入 "spec"
	for(int i = 1; i < argc; i++){
		MapReduceInput *input = spec.add_input();
		input->set_format("text");
		input->set_filepattern(argv[i]);
		input->set_mapper_class("WordCounter");
	}
	//指定輸出文件
	MapReduceOutput *out = spec.output();
	out->set_filebase("gfs/test/freq");
	out->set_num_tasks(100);
	out->set_format("text");
	out->set_reducer_class("Adder");
	
	//可選:在 map 節點中做部分和運算以節省帶寬
	out->set_combiner_class("Adder");
	
	//調節參數:使用最多2000台機器,每個任務最多100MB內存
	spec.set_machines(2000);
	spec.set_map_megabytes(100);
	spec.set_reduce_megabytes(100);
	
	//開跑
	MapReduceResult result;
	if(!MapReduce(spec, &result)) abort();
	
	//失敗的時候 abort, 能運行在這里就是成功了。
	return 0;
}

  


參考:
MapReduce: Simplified Data Processing on Large Clusters


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM