1.streaming的作用
Haoop支持用其他語言來編程,需要用到名為Streaming的通用API。
Streaming主要用於編寫簡單,短小的MapReduce程序,可以通過腳本語言編程,開發更快捷,並充分利用非Java庫。
HadoopStreaming使用Unix中的流與程序交互,從stdin輸入數據,從stdout輸出數據。實際上可以用任何命令作為mapper和reducer。數據流示意如下:
cat [intput_file] | [mapper] | sort | [reducer] > [output_file]
2.使用方法
使用如下命令:
hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar \
>-input cite75_99.txt \
> -output output \
>-mapper 'cut -f 2 -d ,' \
>-reducer 'uniq'
第一行表示使用的StreamingAPI,位於圖中位置得jar包中
-input和 -output參數用於設置輸入輸出文件或目錄
-mapper和-reducer通過引號中得參數進行設定,分別進行了截取第二列數據,
uniq進行了排序去重。
注意:每行是完全按照字母方式排序,因為Streaming完全采用文本方式處理數據,而不知道其他得數據類型。輸出結果如下:
kqiao@ubuntu:~/hadoop-0.20.203.0$ hadoop fs -cat outputStreaming/part-00000 | head -10
"CITED"
1
10000
100000
1000006
1000007
1000011
1000017
1000026
1000033
......
3.用Streaming處理鍵值對
默認情況下,Streaming使用\t分離記錄中得鍵和值,當沒有\t時,整個記錄被視為鍵,值為空白文本。
不同於AttributeMax.py為每個鍵尋找最大值,這次我們試着為每個國家找到專利聲明數的平均值。(Hadoop包含得名為Aggregate包,可以為每個鍵尋找最大值)
(1)Streaming中得mapper通過STDIN讀取一個 分片,並將每一行提取為一個記錄。Mapper可以選擇是把每條記錄翻譯為一個鍵值對,還是一行文本
此步從 輸入文件到<k1,v1>
(2) 對於mapper輸出的每一行,Streaming API將之翻譯為用\t分隔的鍵值對,類似於MapReduce中的划分,可以用pationer來處理鍵。最終所有鍵一致的key/value進入相同reducer。
(3)沒個reducer以鍵為基准排序鍵值對,如同在Java模式中, 相同鍵的鍵值對被結組為一個鍵和一列值。reducer處理這些分組。
(4)在實踐中,reducer的輸出(STDOUT)被寫入到一個文件中(由-output指定)
對AverageByAttributeMaper.py:
<<<<<<<<<<<<<<<<<<<Mapper daima>>>>>>>>>>>>>>>>>>>
- 無reducer方式運行時:-D mapred.reducer.tasks=0
輸出由行組成:一個國家代碼 \t 一個計數值 , 並且其順序與輸入記錄一致
- 以IdentityReducer方式運行,設置-D mapred.reducer.tasks=1(這種方式只要不設置-reducer選項即可)
執行結束可以看到 雖然每行的內容與上一個相同,但是順序被重排,鍵相同的“結組”在一起。可以根據這些信息考慮自己的reducer設計:
AverageByAttributeReducer.py——將相同鍵的值求和計數,在遇到新的鍵或到文件尾時,計算前一個鍵的平均值並輸出到STDOUT中。
<<<<<<<<<<<<<<<<<<<<<<Reducer daima>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
這時可以運行完整的MapReduce作業。會得到真正的平均值文件。
4.通過Aggregate包使用Streaming
Hadoop有一個成為Aggregate軟件包,極大簡化數據集的匯總統計。尤其在使用Streaming時。
在Streaming中Aggregate包作為Reducer做聚集統計,只需提供一個mapper處理記錄並以特定格式輸出。輸出每行如下:
function:key\tvalue
function為值聚合函數的名稱(由Aggregate包中預定義函數獲得),接着一組 鍵值對,值聚合函數列表如下:
如果要計算每年授權的專利數,考慮編寫MapReduce程序的方法:
可以使mapper的輸出將年設置為key,而value的值恆為1。這樣reducer只需要對所有的1求和即可。使用基於Aggregate包的Streaming來實現:AttributeCount.py
#!/usr/bin/env python import sys index = int(sys.argv[1]) for line in sys.stdin: fields = line.split(",") print "LongValueSum:" + fields[index] + "\t" + "1"
關鍵語句:print "LongValueSum:" + fields[index] + "\t" + "1"。。按指定格式( function:key\tvalue) 打印到輸出!
運行如下:
hadoop jar contrib/streaming/hadoop-streaming-0.20.203.jar \ -file AttributeCount.py \ -input apat63_99.txt \ -output output \ -mapper 'AttributeCount.py 1' \ -reducer aggregate