MapReduce編程模型
在Google的一篇重要的論文MapReduce: Simplified Data Processing on Large Clusters中提到,Google公司有大量的諸如Web請求日志、爬蟲抓取的文檔之類的數據需要處理,由於數據量巨大,只能將其分散在成百上千台機器上處理,如何處理並行計算、如何分發數據、如何處理錯誤,所有這些問題綜合在一起,需要大量的代碼處理,因此也使得原本簡單的運算變得難以處理。
為了解決上述復雜的問題,Google設計一個新的抽象模型,使用這個抽象模型,只要表述想要執行的簡單運算即可,而不必關心並行計算、容錯、數據分布、負載均衡等復雜的細節,這些問題都被封裝在了一個庫里面。設計這個抽象模型的靈感來自Lisp和許多其他函數式語言的Map和Reduce原語,為了加深理解,現以Python的Map和Reduce為例演示如下:
Python內置了map函數,該函數接受兩個參數,第一個參數是一個函數對象,第二個參數是一個序列,執行map函數后,序列中的每個元素都會被按照第一個參數指定的函數制定的規則執行一遍后生成一個新的序列,請看如下:
def char2int(c):
return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[c]
if __name__ == '__main__':
result = map(char2int,'123456')
print(list(result))
python中的字符串本身也是序列,執行map函數后,每個子字符串適應char2int指定的規則,生成了一個新的序列 [1, 2, 3, 4, 5, 6]。
python還有reduce函數,該函數同樣接受兩個參數,第一個參數也是函數對象,假設名為f,該函數必須接受兩個參數,第二參數是要處理的序列,執行reduce函數后,首先,f函數處理序列中的頭兩個元素,把處理的結果和序列中的第三個元素再次作為參數處理,直到處理完所有的序列里的元素:
reduce(f, [x1, x2, x3, x4]) = f(f(f(x1, x2), x3), x4)
請看示例:
from functools import reduce
def char2int(c):
return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[c]
def calculate(x, y):
return x* 10 + y
if __name__ == '__main__':
result = reduce(calculate, map(char2int,'123456'))
print(result)
其輸出結果是123456,執行上面的map和reduce,結果就是將一個字符串轉行為了數字,map首先將序列中的每個子字符串轉換為數字,redcue遍歷map生成的臨時序列,並將臨時序列中的元素依次按照函數calculate制定的規則處理,且將第一次處理的結果作為下次處理的一個輸入參數。
Hadoop中的MapReduce
執行流程
Hadoop中的MapReduce編程模型借鑒了函數式語言中的map和reduce函數,並且將執行過程放在了多個計算機上並行執行,大概的流程如下:
- 源數據分片
- map函數在集群中並行處理分片
- 對map任務執行產生的結果分區,每個分區的數據交給一個reduce處理
數據分片
Hadoop將源文件分片,然后將分片分發到集群中的節點上等待map函數對其進行處理,每個分片的大小應以趨向於HDFS的一個block的大小為宜,如果分的太小,管理分片和構建map任務將會花去較長時間,如果分的太大,可能會出現一個分片的數據存放在兩台節點中的情況,這樣執行map任務時數據必須在不同的節點中通過網絡進行傳遞,浪費了時間。
執行map任務
Hadoop會在放置分片的節點上執行map任務,將執行后產生的文件放到本地磁盤,但不是放到HDFS中,因為這個文件僅僅是中間結果,還要將其傳遞給reduce任務處理,沒有必要存放在HDFS,因為存放在HDFS的話需要節點直接通過網絡傳遞數據以備份多份,從而造成了空間和時間的浪費。如果map任務失敗了,hadoop會從新分配節點執行。
不知道是不是有讀者會想到SETI@home,這是一個尋找外星文明的科學計划,方法是分析射電望遠鏡數據,但是由於數據量巨大,所有組織者在全球尋找了很多自願者,自願者在自己機器上安裝一個軟件,該軟件會下載數據並且在電腦空閑時分析數據並將結果提交。筆者曾經也想為尋找外星人貢獻一份力量,於是也下載了軟件,運行了一段時間,它還提供一個電腦屏保,顯示的是當前正在處理的數據,包括動態的柱形圖等,還挺有意思,最后筆者感覺電腦很卡,無情的放棄了尋找外星人的遠大理想,那時候我還有點愧疚,擔心會不會外星人就隱藏在因為我卸載軟件而沒有分析到的那部分我已經下載的數據,現在想想那時還是圖樣圖森破,人家一定有記錄,超過一定時間沒有提交一定會把任務重新分配給別的自願者的。SETI@home和Hadoop的分布式計算非常非常的像,不同的是SETI@home把分片發給了節點,節點會根據軟件安裝時設定好的程序分析這些分片。而Hadoop是把分析規則(用戶自定義的map和reduce)分配給具有分片的節點上,然后用這個規則分析分片。
執行reduce任務
如果有多個reduce任務,每個map任務會將輸出結果分區,即為每個reduce任務創建一個分區,每個分區由一些鍵值對組成,reduce完成處理后將數據寫入HDFS。請看如下圖(來自《Hadoop權威指南》):

其中,虛線框表示節點,虛線箭頭表示節點內部的數據傳輸,實線箭頭表示節點直接的數據傳輸。
一個MapReduce程序示例
《Hadoop實戰》中的第一個程序,分析美國從1975年到1999年之間的專利引用情況,源文件下載地址:http://www.nber.org/patents/Cite75_99.txt,格式如下:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889
3858242,1515701
3858242,3319261
3858242,3668705
3858242,3707004
3858243,2949611
3858243,3146465
3858243,3156927
3858243,3221341
3858243,3574238
3858243,3681785
3858243,3684611
3858244,14040
其中,第一列為引用專利編號,第二列為被引用專利編號,我們的目的是統計專利被哪些專利引用了,最后生成文件格式如下:
1 33964859,4647229
10000 3539112
第一列為專利號,第二列為引用第一列的專利的專利的專利號。程序如下:
package joey.cnblogs.patent;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PatentJob {
public static class Map extends Mapper<Text, Text, Text, Text> {
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, key);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
String citing = "";
while(values.hasNext()){
if(citing.length() > 0){
citing += ",";
}
citing += values.next().toString();
}
context.write(key, new Text(citing));
}
}
public static void run(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
Job job = new Job(conf, "PatentJob");
job.setJarByClass(PatentJob.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
run(args[0], args[1]);
}
}
關於如何在偽分布式上面運行程序以及如何從HDFS上面讀取寫入文件請參考我寫的前兩篇文章:Hadoop學習之旅一:Hello Hadoop和Hadoop學習之旅二:HDFS。現說明編寫MapReduce程序的幾個比較重要的點:
參數類型
map接收類型為K1,V1;map的輸出類型為K2,V2;reduce的接收類型為K2,List[V2];reduce的輸出類型為K3,V3,也就是說reduce的輸入類型必須和map的輸出類型一致。通常可以看見很多XXXWritable之類的,對應標准的XXX,加了Wriable后綴說明這個類型和Hadoop序列化有關,String對應的類型叫Text,定規則的人也比較奇葩把,為什么不叫StringWritable呢?如果大家要在map或reduce函數中使用自己的類型,請別忘了讓這個類型符合hadoop序列化的要求,至於怎么做在次不做討論,無非就是繼承一個XXX類,或實現XXX接口之類的。
map的輸入類型
map的輸入類型有由job.setInputFormat函數指定,如果此處指定KeyValueTextInputFormat,那么每個map任務將執行源文件中的一行數據,且以被分隔符分割的第一個字符串為key值,如果此處指定了TextInputFormat(TextInputFormat也是默認的InputFormat),那么map任務每次還是執行源文件中的一行,但是key為該行在文件中的字節偏移量,所以K1的類型宜為LongWritable,還有一個處理文本文件的InputFormat是NLineInputFormat,看名字就知道大概什么意思。剩下的是一些處理二進制文件的InputFormat,此處暫時不再探討,估計以后也不會再探討了,Java實在是太難用了,受不了,而且也不會有公司讓我這樣的菜鳥去搞大數據的。
關於combiner函數
combiner函數可以減少reduce的負擔,以及減少數據在節點中的傳輸量,因為combiner是在map任務所在節點執行的,對map的輸出進行合並,執行后的結果才會發送給reduce所在節點,combiner的實現通常和reduce是一樣的。需要注意的是,不是所有的場景都適合combiner,比如求平均值就不適合(出自《Hadoop權威指南》):
mean(0,20,10,25,15) = 14
mean(mean(0,20,10), mean(25,15)) = mean(10,20) = 15
后記
上面那個程序,其實是有問題的,我第一次編譯的時候沒有使用@Override關鍵字,結果和我想象的不同,后來加了@Override后發現reduce方法編譯失敗了,我實在是想不到有什么錯,對自己的智商都有所懷疑了,如果誰有興趣運行一下並發現問題的話請告訴我,不甚感激。
比起C#,Java真的很難用,我用C#做東西,幾乎不看文檔就能猜到類庫的意圖,Java很是費勁,最無語的是Hadoop中新API和舊API中的命名,同一個名字一會兒是類,一會兒又是接口,還有好幾個類新API和舊API中的名字都一樣,命名空間不同,很容易搞混的,真的很服了。
參考資料
- 《Hadoop權威指南》
- 《Hadoop實戰》
- 網易雲課堂:大數據工程師
- MapReduce: Simplified Data Processing on Large Clusters
- 廖雪峰的Python教程
